diff --git a/.github/workflows/integration_test_mysql.yaml b/.github/workflows/integration_test_mysql.yaml index 2319a77bd..b45e96e6b 100644 --- a/.github/workflows/integration_test_mysql.yaml +++ b/.github/workflows/integration_test_mysql.yaml @@ -75,8 +75,8 @@ jobs: if: ${{ always() }} run: | DIR=$(sudo find /tmp/tidb_cdc_test/ -type d -name 'cdc_data' -exec dirname {} \;) - CASE=$(basename $DIR) [ -z "$DIR" ] && exit 0 + CASE=$(basename $DIR) mkdir -p ./logs/$CASE cat $DIR/stdout.log tail -n 10 $DIR/cdc.log diff --git a/cmd/main.go b/cmd/main.go index fbfa1140b..83f887bbd 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -15,6 +15,7 @@ package main import ( "os" + "slices" "strings" "github.com/pingcap/log" @@ -48,6 +49,7 @@ func addNewArchCommandTo(cmd *cobra.Command) { } func isNewArchEnabledByConfig(serverConfigFilePath string) bool { + cfg := config.GetDefaultServerConfig() if len(serverConfigFilePath) > 0 { // strict decode config file, but ignore debug item @@ -80,6 +82,12 @@ func parseConfigFlagFromOSArgs() string { serverConfigFilePath = os.Args[i+2] } } + + // If the command is `cdc cli changefeed`, means it's not a server config file. + if slices.Contains(os.Args, "cli") && slices.Contains(os.Args, "changefeed") { + serverConfigFilePath = "" + } + return serverConfigFilePath } diff --git a/docs/design/2024-12-20-ticdc-flow-control.md b/docs/design/2024-12-20-ticdc-flow-control.md index 0359e4c22..344964107 100644 --- a/docs/design/2024-12-20-ticdc-flow-control.md +++ b/docs/design/2024-12-20-ticdc-flow-control.md @@ -12,7 +12,7 @@ TiCDC processes data in two main parts: The following diagram illustrates the relationship between the **data puller** and **data sinker**: -![Data Flow](./medias/flow-control-1.png) +![Data Flow](../media/flow-control-1.png) In this architecture, **EventService** and **EventCollector** facilitate communication between the two parts: diff --git a/downstreamadapter/eventcollector/event_collector.go b/downstreamadapter/eventcollector/event_collector.go index b9a90d528..e3cb746bc 100644 --- a/downstreamadapter/eventcollector/event_collector.go +++ b/downstreamadapter/eventcollector/event_collector.go @@ -334,7 +334,10 @@ func (c *EventCollector) mustSendDispatcherRequest(target node.ID, topic string, if err != nil { log.Info("failed to send dispatcher request message to event service, try again later", - zap.Stringer("target", target), + zap.String("changefeedID", req.Dispatcher.GetChangefeedID().ID().String()), + zap.Stringer("dispatcher", req.Dispatcher.GetId()), + zap.Any("target", target.String()), + zap.Any("request", req), zap.Error(err)) // Put the request back to the channel for later retry. c.dispatcherRequestChan.In() <- DispatcherRequestWithTarget{ @@ -570,6 +573,7 @@ func (d *dispatcherStat) handleHandshakeEvent(event dispatcher.DispatcherEvent, func (d *dispatcherStat) handleReadyEvent(event dispatcher.DispatcherEvent, eventCollector *EventCollector) { d.eventServiceInfo.Lock() defer d.eventServiceInfo.Unlock() + if event.GetType() != commonEvent.TypeReadyEvent { log.Panic("should not happen") } @@ -658,7 +662,7 @@ func (d *dispatcherStat) unregisterDispatcher(eventCollector *EventCollector) { ActionType: eventpb.ActionType_ACTION_TYPE_REMOVE, }) // unregister from remote event service if have - if d.eventServiceInfo.serverID != eventCollector.serverId { + if d.eventServiceInfo.serverID != "" && d.eventServiceInfo.serverID != eventCollector.serverId { eventCollector.mustSendDispatcherRequest(d.eventServiceInfo.serverID, eventServiceTopic, DispatcherRequest{ Dispatcher: d.target, ActionType: eventpb.ActionType_ACTION_TYPE_REMOVE, diff --git a/maintainer/maintainer.go b/maintainer/maintainer.go index 33ecdf84b..14757b279 100644 --- a/maintainer/maintainer.go +++ b/maintainer/maintainer.go @@ -34,7 +34,6 @@ import ( "github.com/pingcap/ticdc/pkg/sink/util" "github.com/pingcap/ticdc/server/watcher" "github.com/pingcap/ticdc/utils/chann" - "github.com/pingcap/ticdc/utils/dynstream" "github.com/pingcap/ticdc/utils/threadpool" "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/pkg/errors" @@ -45,6 +44,10 @@ import ( "go.uber.org/zap" ) +const ( + periodEventInterval = time.Millisecond * 200 +) + // Maintainer is response for handle changefeed replication tasks. Maintainer should: // 1. schedule tables to dispatcher manager // 2. calculate changefeed checkpoint ts @@ -66,14 +69,17 @@ type Maintainer struct { eventCh *chann.DrainableChann[*Event] - stream dynstream.DynamicStream[int, common.GID, *Event, *Maintainer, *StreamHandler] taskScheduler threadpool.ThreadPool mc messaging.MessageCenter - watermark *heartbeatpb.Watermark + watermark struct { + mu sync.RWMutex + *heartbeatpb.Watermark + } + checkpointTsByCapture map[node.ID]heartbeatpb.Watermark - state heartbeatpb.ComponentState + state atomic.Int32 bootstrapper *bootstrap.Bootstrapper[heartbeatpb.MaintainerBootstrapResponse] changefeedSate model.FeedState @@ -93,7 +99,8 @@ type Maintainer struct { pdEndpoints []string nodeManager *watcher.NodeManager - nodesClosed map[node.ID]struct{} + // closedNodes is used to record the nodes that dispatcherManager is closed + closedNodes map[node.ID]struct{} statusChanged *atomic.Bool nodeChanged *atomic.Bool @@ -127,7 +134,6 @@ func NewMaintainer(cfID common.ChangeFeedID, conf *config.SchedulerConfig, cfg *config.ChangeFeedInfo, selfNode *node.Info, - stream dynstream.DynamicStream[int, common.GID, *Event, *Maintainer, *StreamHandler], taskScheduler threadpool.ThreadPool, pdAPI pdutil.PDAPIClient, tsoClient replica.TSOClient, @@ -148,26 +154,20 @@ func NewMaintainer(cfID common.ChangeFeedID, id: cfID, selfNode: selfNode, eventCh: chann.NewAutoDrainChann[*Event](), - stream: stream, taskScheduler: taskScheduler, startCheckpointTs: checkpointTs, controller: NewController(cfID, checkpointTs, pdAPI, tsoClient, regionCache, taskScheduler, cfg.Config, ddlSpan, conf.AddTableBatchSize, time.Duration(conf.CheckBalanceInterval)), mc: mc, - state: heartbeatpb.ComponentState_Working, removed: atomic.NewBool(false), nodeManager: nodeManager, - nodesClosed: make(map[node.ID]struct{}), + closedNodes: make(map[node.ID]struct{}), statusChanged: atomic.NewBool(true), nodeChanged: atomic.NewBool(false), cascadeRemoving: false, config: cfg, - ddlSpan: ddlSpan, - watermark: &heartbeatpb.Watermark{ - CheckpointTs: checkpointTs, - ResolvedTs: checkpointTs, - }, + ddlSpan: ddlSpan, checkpointTsByCapture: make(map[node.ID]heartbeatpb.Watermark), runningErrors: map[node.ID]*heartbeatpb.RunningError{}, @@ -181,8 +181,14 @@ func NewMaintainer(cfID common.ChangeFeedID, tableCountGauge: metrics.TableGauge.WithLabelValues(cfID.Namespace(), cfID.Name()), handleEventDuration: metrics.MaintainerHandleEventDuration.WithLabelValues(cfID.Namespace(), cfID.Name()), } + + m.watermark.Watermark = &heartbeatpb.Watermark{ + CheckpointTs: checkpointTs, + ResolvedTs: checkpointTs, + } + m.state.Store(int32(heartbeatpb.ComponentState_Working)) m.bootstrapper = bootstrap.NewBootstrapper[heartbeatpb.MaintainerBootstrapResponse](m.id.Name(), m.getNewBootstrapFn()) - log.Info("maintainer is created", zap.String("id", cfID.String()), + log.Info("changefeed maintainer is created", zap.String("id", cfID.String()), zap.Uint64("checkpointTs", checkpointTs), zap.String("ddl dispatcher", tableTriggerEventDispatcherID.String())) metrics.MaintainerGauge.WithLabelValues(cfID.Namespace(), cfID.Name()).Inc() @@ -198,7 +204,6 @@ func NewMaintainer(cfID common.ChangeFeedID, func NewMaintainerForRemove(cfID common.ChangeFeedID, conf *config.SchedulerConfig, selfNode *node.Info, - stream dynstream.DynamicStream[int, common.GID, *Event, *Maintainer, *StreamHandler], taskScheduler threadpool.ThreadPool, pdAPI pdutil.PDAPIClient, tsoClient replica.TSOClient, @@ -209,14 +214,14 @@ func NewMaintainerForRemove(cfID common.ChangeFeedID, SinkURI: "", Config: config.GetDefaultReplicaConfig(), } - m := NewMaintainer(cfID, conf, unused, selfNode, stream, taskScheduler, pdAPI, + m := NewMaintainer(cfID, conf, unused, selfNode, taskScheduler, pdAPI, tsoClient, regionCache, 1) m.cascadeRemoving = true // setup period event - SubmitScheduledEvent(m.taskScheduler, m.stream, &Event{ + m.submitScheduledEvent(m.taskScheduler, &Event{ changefeedID: m.id, eventType: EventPeriod, - }, time.Now().Add(time.Millisecond*500)) + }, time.Now().Add(periodEventInterval)) return m } @@ -236,7 +241,7 @@ func (m *Maintainer) HandleEvent(event *Event) bool { } m.handleEventDuration.Observe(duration.Seconds()) }() - if m.state == heartbeatpb.ComponentState_Stopped { + if m.state.Load() == int32(heartbeatpb.ComponentState_Stopped) { log.Warn("maintainer is stopped, ignore", zap.String("changefeed", m.id.String())) return false @@ -265,7 +270,7 @@ func (m *Maintainer) Close() { log.Info("changefeed maintainer closed", zap.String("id", m.id.String()), zap.Bool("removed", m.removed.Load()), - zap.Uint64("checkpointTs", m.watermark.CheckpointTs)) + zap.Uint64("checkpointTs", m.getWatermark().CheckpointTs)) } func (m *Maintainer) GetMaintainerStatus() *heartbeatpb.MaintainerStatus { @@ -279,11 +284,12 @@ func (m *Maintainer) GetMaintainerStatus() *heartbeatpb.MaintainerStatus { } clear(m.runningErrors) } + status := &heartbeatpb.MaintainerStatus{ ChangefeedID: m.id.ToPB(), FeedState: string(m.changefeedSate), - State: m.state, - CheckpointTs: m.watermark.CheckpointTs, + State: heartbeatpb.ComponentState(m.state.Load()), + CheckpointTs: m.getWatermark().CheckpointTs, Err: runningErrors, } return status @@ -308,14 +314,14 @@ func (m *Maintainer) initialize() error { } m.sendMessages(m.bootstrapper.HandleNewNodes(newNodes)) // setup period event - SubmitScheduledEvent(m.taskScheduler, m.stream, &Event{ + m.submitScheduledEvent(m.taskScheduler, &Event{ changefeedID: m.id, eventType: EventPeriod, - }, time.Now().Add(time.Millisecond*500)) + }, time.Now().Add(periodEventInterval)) log.Info("changefeed maintainer initialized", zap.String("id", m.id.String()), zap.Duration("duration", time.Since(start))) - m.state = heartbeatpb.ComponentState_Working + m.state.Store(int32(heartbeatpb.ComponentState_Working)) m.statusChanged.Store(true) return nil } @@ -371,7 +377,7 @@ func (m *Maintainer) onRemoveMaintainer(cascade, changefeedRemoved bool) { closed := m.tryCloseChangefeed() if closed { m.removed.Store(true) - m.state = heartbeatpb.ComponentState_Stopped + m.state.Store(int32(heartbeatpb.ComponentState_Stopped)) metrics.MaintainerGauge.WithLabelValues(m.id.Namespace(), m.id.Name()).Dec() } } @@ -417,8 +423,8 @@ func (m *Maintainer) calCheckpointTs() { if !m.bootstrapped { log.Warn("can not advance checkpointTs since not bootstrapped", zap.String("changefeed", m.id.Name()), - zap.Uint64("checkpointTs", m.watermark.CheckpointTs), - zap.Uint64("resolvedTs", m.watermark.ResolvedTs)) + zap.Uint64("checkpointTs", m.getWatermark().CheckpointTs), + zap.Uint64("resolvedTs", m.getWatermark().ResolvedTs)) return } // make sure there is no task running @@ -429,16 +435,16 @@ func (m *Maintainer) calCheckpointTs() { if !m.controller.ScheduleFinished() { log.Warn("can not advance checkpointTs since schedule is not finished", zap.String("changefeed", m.id.Name()), - zap.Uint64("checkpointTs", m.watermark.CheckpointTs), - zap.Uint64("resolvedTs", m.watermark.ResolvedTs), + zap.Uint64("checkpointTs", m.getWatermark().CheckpointTs), + zap.Uint64("resolvedTs", m.getWatermark().ResolvedTs), ) return } if m.barrier.ShouldBlockCheckpointTs() { log.Warn("can not advance checkpointTs since barrier is blocking", zap.String("changefeed", m.id.Name()), - zap.Uint64("checkpointTs", m.watermark.CheckpointTs), - zap.Uint64("resolvedTs", m.watermark.ResolvedTs), + zap.Uint64("checkpointTs", m.getWatermark().CheckpointTs), + zap.Uint64("resolvedTs", m.getWatermark().ResolvedTs), ) return } @@ -454,32 +460,29 @@ func (m *Maintainer) calCheckpointTs() { log.Warn("checkpointTs can not be advanced, since missing capture heartbeat", zap.String("changefeed", m.id.Name()), zap.Any("node", id), - zap.Uint64("checkpointTs", m.watermark.CheckpointTs), - zap.Uint64("resolvedTs", m.watermark.ResolvedTs)) + zap.Uint64("checkpointTs", m.getWatermark().CheckpointTs), + zap.Uint64("resolvedTs", m.getWatermark().ResolvedTs)) return } newWatermark.UpdateMin(m.checkpointTsByCapture[id]) } - if newWatermark.CheckpointTs != math.MaxUint64 { - m.watermark.CheckpointTs = newWatermark.CheckpointTs - } - if newWatermark.ResolvedTs != math.MaxUint64 { - m.watermark.ResolvedTs = newWatermark.ResolvedTs - } + + m.setWatermark(*newWatermark) } func (m *Maintainer) updateMetrics() { - phyCkpTs := oracle.ExtractPhysical(m.watermark.CheckpointTs) + watermark := m.getWatermark() + phyCkpTs := oracle.ExtractPhysical(watermark.CheckpointTs) m.changefeedCheckpointTsGauge.Set(float64(phyCkpTs)) lag := float64(oracle.GetPhysical(time.Now())-phyCkpTs) / 1e3 m.changefeedCheckpointTsLagGauge.Set(lag) - phyResolvedTs := oracle.ExtractPhysical(m.watermark.ResolvedTs) + phyResolvedTs := oracle.ExtractPhysical(watermark.ResolvedTs) m.changefeedResolvedTsGauge.Set(float64(phyResolvedTs)) lag = float64(oracle.GetPhysical(time.Now())-phyResolvedTs) / 1e3 m.changefeedResolvedTsLagGauge.Set(lag) - m.changefeedStatusGauge.Set(float64(m.state)) + m.changefeedStatusGauge.Set(float64(m.state.Load())) } // send message to remote @@ -603,16 +606,15 @@ func (m *Maintainer) sendPostBootstrapRequest() { func (m *Maintainer) onMaintainerCloseResponse(from node.ID, response *heartbeatpb.MaintainerCloseResponse) { if response.Success { - m.nodesClosed[from] = struct{}{} + m.closedNodes[from] = struct{}{} + m.onRemoveMaintainer(m.cascadeRemoving, m.changefeedRemoved) } - // check if all nodes have sent response - m.onRemoveMaintainer(m.cascadeRemoving, m.changefeedRemoved) } func (m *Maintainer) handleResendMessage() { // resend closing message if m.removing { - m.sendMaintainerCloseRequestToAllNode() + m.trySendMaintainerCloseRequestToAllNode() return } // resend bootstrap message @@ -627,20 +629,23 @@ func (m *Maintainer) handleResendMessage() { } func (m *Maintainer) tryCloseChangefeed() bool { - if m.state != heartbeatpb.ComponentState_Stopped { + if m.state.Load() != int32(heartbeatpb.ComponentState_Stopped) { m.statusChanged.Store(true) } if !m.cascadeRemoving { m.controller.RemoveTasksByTableIDs(m.ddlSpan.Span.TableID) return !m.ddlSpan.IsWorking() } - return m.sendMaintainerCloseRequestToAllNode() + return m.trySendMaintainerCloseRequestToAllNode() } -func (m *Maintainer) sendMaintainerCloseRequestToAllNode() bool { +// trySendMaintainerCloseRequestToAllNode is used to send maintainer close request to all nodes +// if all nodes are closed, return true, otherwise return false. +func (m *Maintainer) trySendMaintainerCloseRequestToAllNode() bool { msgs := make([]*messaging.TargetMessage, 0) for n := range m.nodeManager.GetAliveNodes() { - if _, ok := m.nodesClosed[n]; !ok { + // Check if the node is already closed. + if _, ok := m.closedNodes[n]; !ok { msgs = append(msgs, messaging.NewSingleTargetMessage( n, messaging.DispatcherManagerManagerTopic, @@ -734,10 +739,10 @@ func (m *Maintainer) onPeriodTask() { m.handleResendMessage() m.collectMetrics() m.calCheckpointTs() - SubmitScheduledEvent(m.taskScheduler, m.stream, &Event{ + m.submitScheduledEvent(m.taskScheduler, &Event{ changefeedID: m.id, eventType: EventPeriod, - }, time.Now().Add(time.Millisecond*500)) + }, time.Now().Add(periodEventInterval)) } func (m *Maintainer) collectMetrics() { @@ -794,3 +799,42 @@ func (m *Maintainer) runHandleEvents(ctx context.Context) { func (m *Maintainer) MoveTable(tableId int64, targetNode node.ID) error { return m.controller.moveTable(tableId, targetNode) } + +// SubmitScheduledEvent submits a task to controller pool to send a future event +func (m *Maintainer) submitScheduledEvent( + scheduler threadpool.ThreadPool, + event *Event, + scheduleTime time.Time) { + task := func() time.Time { + m.pushEvent(event) + return time.Time{} + } + scheduler.SubmitFunc(task, scheduleTime) +} + +// pushEvent is used to push event to maintainer's event channel +// event will be handled by maintainer's main loop +func (m *Maintainer) pushEvent(event *Event) { + m.eventCh.In() <- event +} + +func (m *Maintainer) getWatermark() heartbeatpb.Watermark { + m.watermark.mu.RLock() + defer m.watermark.mu.RUnlock() + res := heartbeatpb.Watermark{ + CheckpointTs: m.watermark.CheckpointTs, + ResolvedTs: m.watermark.ResolvedTs, + } + return res +} + +func (m *Maintainer) setWatermark(newWatermark heartbeatpb.Watermark) { + m.watermark.mu.Lock() + defer m.watermark.mu.Unlock() + if newWatermark.CheckpointTs != math.MaxUint64 { + m.watermark.CheckpointTs = newWatermark.CheckpointTs + } + if newWatermark.ResolvedTs != math.MaxUint64 { + m.watermark.ResolvedTs = newWatermark.ResolvedTs + } +} diff --git a/maintainer/maintainer_event.go b/maintainer/maintainer_event.go index 0569f791d..82915d03f 100644 --- a/maintainer/maintainer_event.go +++ b/maintainer/maintainer_event.go @@ -14,12 +14,8 @@ package maintainer import ( - "time" - "github.com/pingcap/ticdc/pkg/common" "github.com/pingcap/ticdc/pkg/messaging" - "github.com/pingcap/ticdc/utils/dynstream" - "github.com/pingcap/ticdc/utils/threadpool" ) const ( @@ -37,48 +33,3 @@ type Event struct { eventType int message *messaging.TargetMessage } - -func (e Event) IsBatchable() bool { - return true -} - -// SubmitScheduledEvent submits a task to controller pool to send a future event -func SubmitScheduledEvent( - scheduler threadpool.ThreadPool, - stream dynstream.DynamicStream[int, common.GID, *Event, *Maintainer, *StreamHandler], - event *Event, - scheduleTime time.Time) { - task := func() time.Time { - stream.Push(event.changefeedID.Id, event) - return time.Time{} - } - scheduler.SubmitFunc(task, scheduleTime) -} - -// StreamHandler implements the dynstream Handler, no real logic, just forward event to the maintainer -type StreamHandler struct { -} - -func NewStreamHandler() *StreamHandler { - return &StreamHandler{} -} - -func (m *StreamHandler) Path(event *Event) common.GID { - return event.changefeedID.Id -} - -func (m *StreamHandler) Handle(dest *Maintainer, events ...*Event) (await bool) { - if len(events) != 1 { - // TODO: Support batch - panic("unexpected event count") - } - event := events[0] - return dest.HandleEvent(event) -} - -func (m *StreamHandler) GetSize(event *Event) int { return 0 } -func (m *StreamHandler) GetArea(path common.GID, dest *Maintainer) int { return 0 } -func (m *StreamHandler) GetTimestamp(event *Event) dynstream.Timestamp { return 0 } -func (m *StreamHandler) GetType(event *Event) dynstream.EventType { return dynstream.DefaultEventType } -func (m *StreamHandler) IsPaused(event *Event) bool { return false } -func (m *StreamHandler) OnDrop(event *Event) {} diff --git a/maintainer/maintainer_manager.go b/maintainer/maintainer_manager.go index e433377bd..b3c485cb3 100644 --- a/maintainer/maintainer_manager.go +++ b/maintainer/maintainer_manager.go @@ -26,25 +26,18 @@ import ( appcontext "github.com/pingcap/ticdc/pkg/common/context" "github.com/pingcap/ticdc/pkg/config" "github.com/pingcap/ticdc/pkg/messaging" - "github.com/pingcap/ticdc/pkg/metrics" "github.com/pingcap/ticdc/pkg/node" - "github.com/pingcap/ticdc/utils/dynstream" "github.com/pingcap/ticdc/utils/threadpool" "github.com/pingcap/tiflow/pkg/pdutil" "github.com/tikv/client-go/v2/tikv" "go.uber.org/zap" ) -var ( - metricsDSInputChanLen = metrics.DynamicStreamEventChanSize.WithLabelValues("maintainer-manager") - metricsDSPendingQueueLen = metrics.DynamicStreamPendingQueueLen.WithLabelValues("maintainer-manager") -) - // Manager is the manager of all changefeed maintainer in a ticdc watcher, each ticdc watcher will -// start a Manager when the watcher is startup. the Manager should: -// 1. handle bootstrap command from coordinator and return all changefeed maintainer status -// 2. handle dispatcher command from coordinator: add or remove changefeed maintainer -// 3. check maintainer liveness +// start a Manager when the watcher is startup. It responsible for: +// 1. Handle bootstrap command from coordinator and report all changefeed maintainer status. +// 2. Handle other commands from coordinator: like add or remove changefeed maintainer +// 3. Manage maintainers lifetime type Manager struct { mc messaging.MessageCenter conf *config.SchedulerConfig @@ -60,16 +53,14 @@ type Manager struct { tsoClient replica.TSOClient regionCache *tikv.RegionCache + // msgCh is used to cache messages from coordinator msgCh chan *messaging.TargetMessage - stream dynstream.DynamicStream[int, common.GID, *Event, *Maintainer, *StreamHandler] taskScheduler threadpool.ThreadPool } -// NewMaintainerManager create a changefeed maintainer manager instance, -// 1. manager receives bootstrap command from coordinator -// 2. manager manages maintainer lifetime -// 3. manager report maintainer status to coordinator +// NewMaintainerManager create a changefeed maintainer manager instance +// and register message handler to message center func NewMaintainerManager(selfNode *node.Info, conf *config.SchedulerConfig, pdAPI pdutil.PDAPIClient, @@ -88,8 +79,6 @@ func NewMaintainerManager(selfNode *node.Info, tsoClient: pdClient, regionCache: regionCache, } - m.stream = dynstream.NewDynamicStream(NewStreamHandler()) - m.stream.Start() mc.RegisterHandler(messaging.MaintainerManagerTopic, m.recvMessages) mc.RegisterHandler(messaging.MaintainerTopic, @@ -141,7 +130,8 @@ func (m *Manager) Name() string { } func (m *Manager) Run(ctx context.Context) error { - ticker := time.NewTicker(time.Millisecond * 500) + reportMaintainerStatusInterval := time.Millisecond * 200 + ticker := time.NewTicker(reportMaintainerStatusInterval) defer ticker.Stop() for { select { @@ -159,18 +149,10 @@ func (m *Manager) Run(ctx context.Context) error { cf.Close() log.Info("maintainer removed, remove it from dynamic stream", zap.String("changefeed", cf.id.String())) - if err := m.stream.RemovePath(cf.id.Id); err != nil { - log.Warn("remove path from dynstream failed, will retry later", - zap.String("changefeed", cf.id.String()), - zap.Error(err)) - // try it again later - return true - } m.maintainers.Delete(key) } return true }) - m.updateMetricsOnce() } } } @@ -243,21 +225,20 @@ func (m *Manager) onAddMaintainerRequest(req *heartbeatpb.AddMaintainerRequest) zap.Uint64("checkpointTs", req.CheckpointTs), zap.Any("config", cfConfig)) } - cf := NewMaintainer(cfID, m.conf, cfConfig, m.selfNode, m.stream, m.taskScheduler, + cf := NewMaintainer(cfID, m.conf, cfConfig, m.selfNode, m.taskScheduler, m.pdAPI, m.tsoClient, m.regionCache, req.CheckpointTs) - err = m.stream.AddPath(cfID.Id, cf) if err != nil { log.Warn("add path to dynstream failed, coordinator will retry later", zap.Error(err)) return } + cf.pushEvent(&Event{changefeedID: cfID, eventType: EventInit}) m.maintainers.Store(cfID, cf) - m.stream.Push(cfID.Id, &Event{changefeedID: cfID, eventType: EventInit}) } func (m *Manager) onRemoveMaintainerRequest(msg *messaging.TargetMessage) *heartbeatpb.MaintainerStatus { req := msg.Message[0].(*heartbeatpb.RemoveMaintainerRequest) cfID := common.NewChangefeedIDFromPB(req.GetId()) - _, ok := m.maintainers.Load(cfID) + cf, ok := m.maintainers.Load(cfID) if !ok { if !req.Cascade { log.Warn("ignore remove maintainer request, "+ @@ -271,22 +252,17 @@ func (m *Manager) onRemoveMaintainerRequest(msg *messaging.TargetMessage) *heart } // it's cascade remove, we should remove the dispatcher from all node // here we create a maintainer to run the remove the dispatcher logic - cf := NewMaintainerForRemove(cfID, m.conf, m.selfNode, m.stream, m.taskScheduler, m.pdAPI, + cf = NewMaintainerForRemove(cfID, m.conf, m.selfNode, m.taskScheduler, m.pdAPI, m.tsoClient, m.regionCache) - err := m.stream.AddPath(cfID.Id, cf) - if err != nil { - log.Warn("add path to dynstream failed, coordinator will retry later", zap.Error(err)) - return nil - } m.maintainers.Store(cfID, cf) } - log.Info("received remove maintainer request", - zap.String("changefeed", cfID.String())) - m.stream.Push(cfID.Id, &Event{ + cf.(*Maintainer).pushEvent(&Event{ changefeedID: cfID, eventType: EventMessage, message: msg, }) + log.Info("received remove maintainer request", + zap.String("changefeed", cfID.String())) return nil } @@ -312,7 +288,7 @@ func (m *Manager) onDispatchMaintainerRequest( } func (m *Manager) sendHeartbeat() { - if m.coordinatorVersion > 0 { + if m.isBootstrap() { response := &heartbeatpb.MaintainerHeartbeat{} m.maintainers.Range(func(key, value interface{}) bool { cfMaintainer := value.(*Maintainer) @@ -336,7 +312,7 @@ func (m *Manager) handleMessage(msg *messaging.TargetMessage) { m.onCoordinatorBootstrapRequest(msg) case messaging.TypeAddMaintainerRequest, messaging.TypeRemoveMaintainerRequest: - if m.coordinatorVersion > 0 { + if m.isBootstrap() { status := m.onDispatchMaintainerRequest(msg) if status == nil { return @@ -363,11 +339,6 @@ func (m *Manager) dispatcherMaintainerMessage( case <-ctx.Done(): return ctx.Err() default: - // m.stream.Push(changefeed.Id, &Event{ - // changefeedID: changefeed, - // eventType: EventMessage, - // message: msg, - // }) c, ok := m.maintainers.Load(changefeed) if !ok { log.Warn("maintainer is not found", @@ -375,21 +346,15 @@ func (m *Manager) dispatcherMaintainerMessage( return nil } maintainer := c.(*Maintainer) - maintainer.eventCh.In() <- &Event{ + maintainer.pushEvent(&Event{ changefeedID: changefeed, eventType: EventMessage, message: msg, - } + }) } return nil } -func (m *Manager) updateMetricsOnce() { - dsMetrics := m.stream.GetMetrics() - metricsDSInputChanLen.Set(float64(dsMetrics.EventChanSize)) - metricsDSPendingQueueLen.Set(float64(dsMetrics.PendingQueueLen)) -} - func (m *Manager) GetMaintainerForChangefeed(changefeedID common.ChangeFeedID) *Maintainer { c, ok := m.maintainers.Load(changefeedID) if !ok { @@ -397,3 +362,7 @@ func (m *Manager) GetMaintainerForChangefeed(changefeedID common.ChangeFeedID) * } return c.(*Maintainer) } + +func (m *Manager) isBootstrap() bool { + return m.coordinatorVersion > 0 +} diff --git a/maintainer/maintainer_manager_test.go b/maintainer/maintainer_manager_test.go index b8c3ccb32..ba6b54794 100644 --- a/maintainer/maintainer_manager_test.go +++ b/maintainer/maintainer_manager_test.go @@ -20,6 +20,7 @@ import ( "testing" "time" + "github.com/pingcap/log" "github.com/pingcap/ticdc/heartbeatpb" "github.com/pingcap/ticdc/logservice/schemastore" "github.com/pingcap/ticdc/maintainer/replica" @@ -40,6 +41,7 @@ import ( "google.golang.org/grpc" ) +// This is a integration test for maintainer manager, it may consume a lot of time. // scale out/in close, add/remove tables func TestMaintainerSchedulesNodeChanges(t *testing.T) { ctx := context.Background() @@ -60,9 +62,9 @@ func TestMaintainerSchedulesNodeChanges(t *testing.T) { appcontext.SetService(appcontext.SchemaStore, store) mc := messaging.NewMessageCenter(ctx, selfNode.ID, 0, config.NewDefaultMessageCenterConfig()) appcontext.SetService(appcontext.MessageCenter, mc) - startDispatcherNode(ctx, selfNode, mc, nodeManager) + startDispatcherNode(t, ctx, selfNode, mc, nodeManager) nodeManager.RegisterNodeChangeHandler(appcontext.MessageCenter, mc.OnNodeChanges) - //discard maintainer manager messages + // Discard maintainer manager messages, cuz we don't need to handle them in this test mc.RegisterHandler(messaging.CoordinatorTopic, func(ctx context.Context, msg *messaging.TargetMessage) error { return nil }) @@ -91,6 +93,7 @@ func TestMaintainerSchedulesNodeChanges(t *testing.T) { data, err := json.Marshal(cfConfig) require.NoError(t, err) + // Case 1: Add new changefeed cfID := common.NewChangeFeedIDWithName("test") _ = mc.SendCommand(messaging.NewSingleTargetMessage(selfNode.ID, messaging.MaintainerManagerTopic, &heartbeatpb.AddMaintainerRequest{ @@ -98,16 +101,26 @@ func TestMaintainerSchedulesNodeChanges(t *testing.T) { Config: data, CheckpointTs: 10, })) - time.Sleep(5 * time.Second) - value, _ := manager.maintainers.Load(cfID) + + value, ok := manager.maintainers.Load(cfID) + if !ok { + require.Eventually(t, func() bool { + value, ok = manager.maintainers.Load(cfID) + return ok + }, 20*time.Second, 200*time.Millisecond) + } + require.True(t, ok) maintainer := value.(*Maintainer) - require.Equal(t, 4, - maintainer.controller.replicationDB.GetReplicatingSize()) + require.Eventually(t, func() bool { + return maintainer.controller.replicationDB.GetReplicatingSize() == 4 + }, 20*time.Second, 200*time.Millisecond) require.Equal(t, 4, maintainer.controller.GetTaskSizeByNodeID(selfNode.ID)) - // add 2 new node + log.Info("Pass case 1: Add new changefeed") + + // Case 2: Add new nodes node2 := node.NewInfo("127.0.0.1:8400", "") mc2 := messaging.NewMessageCenter(ctx, node2.ID, 0, config.NewDefaultMessageCenterConfig()) @@ -117,9 +130,9 @@ func TestMaintainerSchedulesNodeChanges(t *testing.T) { node4 := node.NewInfo("127.0.0.1:8600", "") mc4 := messaging.NewMessageCenter(ctx, node4.ID, 0, config.NewDefaultMessageCenterConfig()) - startDispatcherNode(ctx, node2, mc2, nodeManager) - dn3 := startDispatcherNode(ctx, node3, mc3, nodeManager) - dn4 := startDispatcherNode(ctx, node4, mc4, nodeManager) + startDispatcherNode(t, ctx, node2, mc2, nodeManager) + dn3 := startDispatcherNode(t, ctx, node3, mc3, nodeManager) + dn4 := startDispatcherNode(t, ctx, node4, mc4, nodeManager) // notify node changes _, _ = nodeManager.Tick(ctx, &orchestrator.GlobalReactorState{ @@ -131,18 +144,25 @@ func TestMaintainerSchedulesNodeChanges(t *testing.T) { }}) time.Sleep(5 * time.Second) - require.Equal(t, 4, - maintainer.controller.replicationDB.GetReplicatingSize()) - require.Equal(t, 1, - maintainer.controller.GetTaskSizeByNodeID(selfNode.ID)) - require.Equal(t, 1, - maintainer.controller.GetTaskSizeByNodeID(node2.ID)) - require.Equal(t, 1, - maintainer.controller.GetTaskSizeByNodeID(node3.ID)) - require.Equal(t, 1, - maintainer.controller.GetTaskSizeByNodeID(node4.ID)) - - // remove 2 nodes + require.Eventually(t, func() bool { + return maintainer.controller.replicationDB.GetReplicatingSize() == 4 + }, 20*time.Second, 200*time.Millisecond) + require.Eventually(t, func() bool { + return maintainer.controller.GetTaskSizeByNodeID(selfNode.ID) == 1 + }, 20*time.Second, 200*time.Millisecond) + require.Eventually(t, func() bool { + return maintainer.controller.GetTaskSizeByNodeID(node2.ID) == 1 + }, 20*time.Second, 200*time.Millisecond) + require.Eventually(t, func() bool { + return maintainer.controller.GetTaskSizeByNodeID(node3.ID) == 1 + }, 20*time.Second, 200*time.Millisecond) + require.Eventually(t, func() bool { + return maintainer.controller.GetTaskSizeByNodeID(node4.ID) == 1 + }, 20*time.Second, 200*time.Millisecond) + + log.Info("Pass case 2: Add new nodes") + + // Case 3: Remove 2 nodes dn3.stop() dn4.stop() _, _ = nodeManager.Tick(ctx, &orchestrator.GlobalReactorState{ @@ -150,25 +170,33 @@ func TestMaintainerSchedulesNodeChanges(t *testing.T) { model.CaptureID(selfNode.ID): {ID: model.CaptureID(selfNode.ID), AdvertiseAddr: selfNode.AdvertiseAddr}, model.CaptureID(node2.ID): {ID: model.CaptureID(node2.ID), AdvertiseAddr: node2.AdvertiseAddr}, }}) - time.Sleep(5 * time.Second) - require.Equal(t, 4, - maintainer.controller.replicationDB.GetReplicatingSize()) - require.Equal(t, 2, - maintainer.controller.GetTaskSizeByNodeID(selfNode.ID)) - require.Equal(t, 2, - maintainer.controller.GetTaskSizeByNodeID(node2.ID)) - // remove 2 tables + require.Eventually(t, func() bool { + return maintainer.controller.replicationDB.GetReplicatingSize() == 4 + }, 20*time.Second, 200*time.Millisecond) + require.Eventually(t, func() bool { + return maintainer.controller.GetTaskSizeByNodeID(selfNode.ID) == 2 + }, 20*time.Second, 200*time.Millisecond) + require.Eventually(t, func() bool { + return maintainer.controller.GetTaskSizeByNodeID(node2.ID) == 2 + }, 20*time.Second, 200*time.Millisecond) + + log.Info("Pass case 3: Remove 2 nodes") + + // Case 4: Remove 2 tables maintainer.controller.RemoveTasksByTableIDs(2, 3) - time.Sleep(5 * time.Second) - require.Equal(t, 2, - maintainer.controller.replicationDB.GetReplicatingSize()) - require.Equal(t, 1, - maintainer.controller.GetTaskSizeByNodeID(selfNode.ID)) - require.Equal(t, 1, - maintainer.controller.GetTaskSizeByNodeID(node2.ID)) + require.Eventually(t, func() bool { + return maintainer.controller.replicationDB.GetReplicatingSize() == 2 + }, 20*time.Second, 200*time.Millisecond) + require.Eventually(t, func() bool { + return maintainer.controller.GetTaskSizeByNodeID(selfNode.ID) == 1 + }, 20*time.Second, 200*time.Millisecond) + require.Eventually(t, func() bool { + return maintainer.controller.GetTaskSizeByNodeID(node2.ID) == 1 + }, 20*time.Second, 200*time.Millisecond) + log.Info("Pass case 4: Remove 2 tables") - // add 2 tables + // Case 5: Add 2 tables maintainer.controller.AddNewTable(commonEvent.Table{ SchemaID: 1, TableID: 5, @@ -177,30 +205,44 @@ func TestMaintainerSchedulesNodeChanges(t *testing.T) { SchemaID: 1, TableID: 6, }, 3) - time.Sleep(5 * time.Second) - require.Equal(t, 4, - maintainer.controller.replicationDB.GetReplicatingSize()) - require.Equal(t, 2, - maintainer.controller.GetTaskSizeByNodeID(selfNode.ID)) - require.Equal(t, 2, - maintainer.controller.GetTaskSizeByNodeID(node2.ID)) + require.Eventually(t, func() bool { + return maintainer.controller.replicationDB.GetReplicatingSize() == 4 + }, 20*time.Second, 200*time.Millisecond) + require.Eventually(t, func() bool { + return maintainer.controller.GetTaskSizeByNodeID(selfNode.ID) == 2 + }, 20*time.Second, 200*time.Millisecond) + require.Eventually(t, func() bool { + return maintainer.controller.GetTaskSizeByNodeID(node2.ID) == 2 + }, 20*time.Second, 200*time.Millisecond) + + log.Info("Pass case 5: Add 2 tables") - //close maintainer + // Case 6: Remove maintainer err = mc.SendCommand(messaging.NewSingleTargetMessage(selfNode.ID, messaging.MaintainerManagerTopic, &heartbeatpb.RemoveMaintainerRequest{Id: cfID.ToPB(), Cascade: true})) require.NoError(t, err) - time.Sleep(2 * time.Second) - require.Equal(t, heartbeatpb.ComponentState_Stopped, maintainer.state) - _, ok := manager.maintainers.Load(cfID) + time.Sleep(5 * time.Second) + + require.Eventually(t, func() bool { + return maintainer.state.Load() == int32(heartbeatpb.ComponentState_Stopped) + }, 20*time.Second, 200*time.Millisecond) + + _, ok = manager.maintainers.Load(cfID) + if ok { + require.Eventually(t, func() bool { + _, ok = manager.maintainers.Load(cfID) + return ok == false + }, 20*time.Second, 200*time.Millisecond) + } require.False(t, ok) - // manager.stream.Close() + log.Info("Pass case 6: Remove maintainer") cancel() } func TestMaintainerBootstrapWithTablesReported(t *testing.T) { ctx := context.Background() ctx, cancel := context.WithCancel(ctx) - selfNode := node.NewInfo("127.0.0.1:18300", "") + selfNode := node.NewInfo("127.0.0.1:18301", "") nodeManager := watcher.NewNodeManager(nil, nil) appcontext.SetService(watcher.NodeManagerName, nodeManager) nodeManager.GetAliveNodes()[selfNode.ID] = selfNode @@ -216,7 +258,7 @@ func TestMaintainerBootstrapWithTablesReported(t *testing.T) { appcontext.SetService(appcontext.SchemaStore, store) mc := messaging.NewMessageCenter(ctx, selfNode.ID, 0, config.NewDefaultMessageCenterConfig()) appcontext.SetService(appcontext.MessageCenter, mc) - startDispatcherNode(ctx, selfNode, mc, nodeManager) + startDispatcherNode(t, ctx, selfNode, mc, nodeManager) nodeManager.RegisterNodeChangeHandler(appcontext.MessageCenter, mc.OnNodeChanges) //discard maintainer manager messages mc.RegisterHandler(messaging.CoordinatorTopic, func(ctx context.Context, msg *messaging.TargetMessage) error { @@ -271,15 +313,24 @@ func TestMaintainerBootstrapWithTablesReported(t *testing.T) { Config: data, CheckpointTs: 10, })) - time.Sleep(5 * time.Second) - value, _ := manager.maintainers.Load(cfID) + value, ok := manager.maintainers.Load(cfID) + if !ok { + require.Eventually(t, func() bool { + value, ok = manager.maintainers.Load(cfID) + return ok + }, 20*time.Second, 200*time.Millisecond) + } + require.True(t, ok) maintainer := value.(*Maintainer) - require.Equal(t, 4, - maintainer.controller.replicationDB.GetReplicatingSize()) - require.Equal(t, 4, - maintainer.controller.GetTaskSizeByNodeID(selfNode.ID)) + require.Eventually(t, func() bool { + return maintainer.controller.replicationDB.GetReplicatingSize() == 4 + }, 20*time.Second, 200*time.Millisecond) + require.Eventually(t, func() bool { + return maintainer.controller.GetTaskSizeByNodeID(selfNode.ID) == 4 + }, 20*time.Second, 200*time.Millisecond) + require.Len(t, remotedIds, 2) foundSize := 0 hasDDLDispatcher := false @@ -297,7 +348,6 @@ func TestMaintainerBootstrapWithTablesReported(t *testing.T) { } require.Equal(t, 2, foundSize) require.False(t, hasDDLDispatcher) - // manager.stream.Close() cancel() } @@ -320,7 +370,7 @@ func TestStopNotExistsMaintainer(t *testing.T) { appcontext.SetService(appcontext.SchemaStore, store) mc := messaging.NewMessageCenter(ctx, selfNode.ID, 0, config.NewDefaultMessageCenterConfig()) appcontext.SetService(appcontext.MessageCenter, mc) - startDispatcherNode(ctx, selfNode, mc, nodeManager) + startDispatcherNode(t, ctx, selfNode, mc, nodeManager) nodeManager.RegisterNodeChangeHandler(appcontext.MessageCenter, mc.OnNodeChanges) //discard maintainer manager messages mc.RegisterHandler(messaging.CoordinatorTopic, func(ctx context.Context, msg *messaging.TargetMessage) error { @@ -347,8 +397,14 @@ func TestStopNotExistsMaintainer(t *testing.T) { Cascade: true, Removed: true, })) - time.Sleep(2 * time.Second) + _, ok := manager.maintainers.Load(cfID) + if ok { + require.Eventually(t, func() bool { + _, ok = manager.maintainers.Load(cfID) + return !ok + }, 20*time.Second, 200*time.Millisecond) + } require.False(t, ok) cancel() } @@ -373,7 +429,7 @@ func (d *dispatcherNode) stop() { d.cancel() } -func startDispatcherNode(ctx context.Context, +func startDispatcherNode(t *testing.T, ctx context.Context, node *node.Info, mc messaging.MessageCenter, nodeManager *watcher.NodeManager) *dispatcherNode { nodeManager.RegisterNodeChangeHandler(node.ID, mc.OnNodeChanges) ctx, cancel := context.WithCancel(ctx) @@ -384,9 +440,7 @@ func startDispatcherNode(ctx context.Context, mcs := messaging.NewMessageCenterServer(mc) proto.RegisterMessageCenterServer(grpcServer, mcs) lis, err := net.Listen("tcp", node.AdvertiseAddr) - if err != nil { - panic(err) - } + require.NoError(t, err) go func() { _ = grpcServer.Serve(lis) }() diff --git a/maintainer/maintainer_test.go b/maintainer/maintainer_test.go index 9f38f169d..bb3464aa3 100644 --- a/maintainer/maintainer_test.go +++ b/maintainer/maintainer_test.go @@ -34,7 +34,6 @@ import ( "github.com/pingcap/ticdc/pkg/metrics" "github.com/pingcap/ticdc/pkg/node" "github.com/pingcap/ticdc/server/watcher" - "github.com/pingcap/ticdc/utils/dynstream" "github.com/pingcap/ticdc/utils/threadpool" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" @@ -297,19 +296,8 @@ func TestMaintainerSchedule(t *testing.T) { nodeManager := watcher.NewNodeManager(nil, nil) appcontext.SetService(watcher.NodeManagerName, nodeManager) nodeManager.GetAliveNodes()[n.ID] = n - stream := dynstream.NewDynamicStream(NewStreamHandler()) - stream.Start() cfID := common.NewChangeFeedIDWithName("test") mc := appcontext.GetService[messaging.MessageCenter](appcontext.MessageCenter) - mc.RegisterHandler(messaging.MaintainerManagerTopic, - func(ctx context.Context, msg *messaging.TargetMessage) error { - stream.Push(cfID.Id, &Event{ - changefeedID: cfID, - eventType: EventMessage, - message: msg, - }) - return nil - }) dispatcherManager := MockDispatcherManager(mc, n.ID) wg := &sync.WaitGroup{} @@ -328,15 +316,24 @@ func TestMaintainerSchedule(t *testing.T) { }, &config.ChangeFeedInfo{ Config: config.GetDefaultReplicaConfig(), - }, n, stream, taskScheduler, nil, tsoClient, nil, 10) - _ = stream.AddPath(cfID.Id, maintainer) + }, n, taskScheduler, nil, tsoClient, nil, 10) + + mc.RegisterHandler(messaging.MaintainerManagerTopic, + func(ctx context.Context, msg *messaging.TargetMessage) error { + maintainer.eventCh.In() <- &Event{ + changefeedID: cfID, + eventType: EventMessage, + message: msg, + } + return nil + }) // send bootstrap message maintainer.sendMessages(maintainer.bootstrapper.HandleNewNodes( []*node.Info{n}, )) // setup period event - SubmitScheduledEvent(maintainer.taskScheduler, maintainer.stream, &Event{ + maintainer.submitScheduledEvent(maintainer.taskScheduler, &Event{ changefeedID: maintainer.id, eventType: EventPeriod, }, time.Now().Add(time.Millisecond*500)) diff --git a/pkg/messaging/message_center.go b/pkg/messaging/message_center.go index 64c25295c..a4f36f930 100644 --- a/pkg/messaging/message_center.go +++ b/pkg/messaging/message_center.go @@ -8,6 +8,7 @@ import ( "github.com/pingcap/ticdc/pkg/node" + "github.com/pingcap/errors" "github.com/pingcap/log" "github.com/pingcap/ticdc/pkg/apperror" "github.com/pingcap/ticdc/pkg/config" @@ -219,7 +220,7 @@ func (mc *messageCenter) SendCommand(msg *TargetMessage) error { target, ok := mc.remoteTargets.m[msg.To] mc.remoteTargets.RUnlock() if !ok { - return apperror.AppError{Type: apperror.ErrorTypeTargetNotFound, Reason: fmt.Sprintf("Target %s not found", msg.To)} + return errors.WithStack(apperror.AppError{Type: apperror.ErrorTypeTargetNotFound, Reason: fmt.Sprintf("Target %v not found", msg.To.String())}) } return target.sendCommand(msg) } diff --git a/pkg/messaging/target.go b/pkg/messaging/target.go index ae000b3f7..1bf0eda17 100644 --- a/pkg/messaging/target.go +++ b/pkg/messaging/target.go @@ -179,13 +179,14 @@ func newRemoteMessageTarget( // close stops the grpc stream and the goroutine spawned by remoteMessageTarget. func (s *remoteMessageTarget) close() { - log.Info("Close remote target", zap.Any("messageCenterID", s.messageCenterID), zap.Any("remote", s.targetId), zap.Any("addr", s.targetAddr)) + log.Info("Closing remote target", zap.Any("messageCenterID", s.messageCenterID), zap.Any("remote", s.targetId), zap.Any("addr", s.targetAddr)) if s.conn != nil { s.conn.Close() s.conn = nil } s.cancel() s.wg.Wait() + log.Info("Close remote target done", zap.Any("messageCenterID", s.messageCenterID), zap.Any("remote", s.targetId)) } func (s *remoteMessageTarget) runHandleErr(ctx context.Context) { diff --git a/tests/integration_tests/_utils/check_logs b/tests/integration_tests/_utils/check_logs index 99aa96013..afbfd9587 100755 --- a/tests/integration_tests/_utils/check_logs +++ b/tests/integration_tests/_utils/check_logs @@ -4,20 +4,6 @@ WORK_DIR=$1 set +e -## check cdc state checker log -if [ ! -f $WORK_DIR/cdc_etcd_check.log ]; then - exit 0 -fi - -grep -q -i test-case-failed $WORK_DIR/cdc_etcd_check.log - -if [ $? -eq 0 ]; then - echo "cdc state checker failed" - exit 1 -else - exit 0 -fi - ## check data race if [ ! -f $WORK_DIR/stdout.log ]; then exit 0 @@ -26,7 +12,7 @@ fi grep -q -i 'DATA RACE' $WORK_DIR/stdout.log if [ $? -eq 0 ]; then - echo "found DATA RACE" + echo "found DATA RACE, please check the logs" exit 1 else exit 0 diff --git a/utils/dynstream/parallel_dynamic_stream.go b/utils/dynstream/parallel_dynamic_stream.go index c28385670..a9fd7ea4a 100644 --- a/utils/dynstream/parallel_dynamic_stream.go +++ b/utils/dynstream/parallel_dynamic_stream.go @@ -3,6 +3,7 @@ package dynstream import ( "reflect" "sync" + "sync/atomic" "time" "unsafe" @@ -25,8 +26,8 @@ type parallelDynamicStream[A Area, P Path, T Event, D Dest, H Handler[A, P, T, D feedbackChan chan Feedback[A, P, D] - _statAddPathCount int - _statRemovePathCount int + _statAddPathCount atomic.Int64 + _statRemovePathCount atomic.Int64 } func newParallelDynamicStream[A Area, P Path, T Event, D Dest, H Handler[A, P, T, D]](hasher PathHasher[P], handler H, option Option) *parallelDynamicStream[A, P, T, D, H] { @@ -129,7 +130,7 @@ func (s *parallelDynamicStream[A, P, T, D, H]) AddPath(path P, dest D, as ...Are pi.stream.in() <- eventWrap[A, P, T, D, H]{pathInfo: pi, newPath: true} - s._statAddPathCount++ + s._statAddPathCount.Add(1) return nil } @@ -150,7 +151,7 @@ func (s *parallelDynamicStream[A, P, T, D, H]) RemovePath(path P) error { pi.stream.in() <- eventWrap[A, P, T, D, H]{pathInfo: pi} delete(s.pathMap, path) - s._statRemovePathCount++ + s._statRemovePathCount.Add(1) return nil } @@ -166,8 +167,8 @@ func (s *parallelDynamicStream[A, P, T, D, H]) GetMetrics() Metrics { size := ds.getPendingSize() metrics.PendingQueueLen += size } - metrics.AddPath = s._statAddPathCount - metrics.RemovePath = s._statRemovePathCount + metrics.AddPath = int(s._statAddPathCount.Load()) + metrics.RemovePath = int(s._statRemovePathCount.Load()) if s.memControl != nil { usedMemory, maxMemory := s.memControl.getMetrics()