diff --git a/downstreamadapter/eventcollector/event_collector.go b/downstreamadapter/eventcollector/event_collector.go index c7545cdb6..ceb6da125 100644 --- a/downstreamadapter/eventcollector/event_collector.go +++ b/downstreamadapter/eventcollector/event_collector.go @@ -432,9 +432,9 @@ func (c *EventCollector) runProcessMessage(ctx context.Context, inCh <-chan *mes event.Event = e c.ds.Push(e.DispatcherID, event) } - c.metricDispatcherReceivedResolvedTsEventCount.Add(float64(len(events))) + c.metricDispatcherReceivedResolvedTsEventCount.Add(float64(event.Len())) default: - c.metricDispatcherReceivedKVEventCount.Inc() + c.metricDispatcherReceivedKVEventCount.Add(float64(event.Len())) c.ds.Push(event.GetDispatcherID(), dispatcher.NewDispatcherEvent(&targetMessage.From, event)) } default: diff --git a/maintainer/replica/checker.go b/maintainer/replica/checker.go index 76c67e67a..f93c50958 100644 --- a/maintainer/replica/checker.go +++ b/maintainer/replica/checker.go @@ -127,7 +127,7 @@ func (s *hotSpanChecker) UpdateStatus(span *SpanReplication) { return } - log.Debug("hotSpanChecker EventSizePerSecond", zap.Any("span", span.Span), zap.Any("dispatcher", span.ID), zap.Any("EventSizePerSecond", status.EventSizePerSecond), zap.Any("writeThreshold", s.writeThreshold)) + log.Debug("hotSpanChecker EventSizePerSecond", zap.Any("changefeed", span.ChangefeedID.Name()), zap.Any("span", span.Span), zap.Any("dispatcher", span.ID), zap.Any("EventSizePerSecond", status.EventSizePerSecond), zap.Any("writeThreshold", s.writeThreshold)) if status.EventSizePerSecond != 0 && status.EventSizePerSecond < s.writeThreshold { if hotSpan, ok := s.hotTasks[span.ID]; ok { diff --git a/metrics/grafana/ticdc_new_arch.json b/metrics/grafana/ticdc_new_arch.json index d4d3027e5..0096cb0db 100644 --- a/metrics/grafana/ticdc_new_arch.json +++ b/metrics/grafana/ticdc_new_arch.json @@ -413,7 +413,7 @@ "timeFrom": null, "timeRegions": [], "timeShift": null, - "title": "Sink Event Count / s", + "title": "Sink Event Row Count / s", "tooltip": { "shared": true, "sort": 2, @@ -1666,7 +1666,7 @@ "timeFrom": null, "timeRegions": [], "timeShift": null, - "title": "Puller output events", + "title": "Puller output event rows", "tooltip": { "shared": true, "sort": 0, @@ -1769,7 +1769,7 @@ "timeFrom": null, "timeRegions": [], "timeShift": null, - "title": "EventService output events / s", + "title": "EventService output event row / s", "tooltip": { "shared": true, "sort": 0, @@ -1874,7 +1874,7 @@ "timeFrom": null, "timeRegions": [], "timeShift": null, - "title": "EventService output events", + "title": "EventService output event rows", "tooltip": { "shared": true, "sort": 0, @@ -1978,7 +1978,7 @@ "timeFrom": null, "timeRegions": [], "timeShift": null, - "title": "Event Collector received event / s", + "title": "Event Collector received event rows / s", "tooltip": { "shared": true, "sort": 2, @@ -2081,7 +2081,7 @@ "timeFrom": null, "timeRegions": [], "timeShift": null, - "title": "Event collector Received events", + "title": "Event collector Received event rows", "tooltip": { "shared": true, "sort": 0, @@ -16691,7 +16691,7 @@ "timeFrom": null, "timeRegions": [], "timeShift": null, - "title": "Puller output events / s", + "title": "Puller output event rows / s", "tooltip": { "shared": true, "sort": 0, diff --git a/pkg/common/event/ddl_event.go b/pkg/common/event/ddl_event.go index 1196af4b3..2b76aacb1 100644 --- a/pkg/common/event/ddl_event.go +++ b/pkg/common/event/ddl_event.go @@ -344,6 +344,10 @@ func (t *DDLEvent) IsPaused() bool { return t.State.IsPaused() } +func (t *DDLEvent) Len() int32 { + return 1 +} + type SchemaTableName struct { SchemaName string TableName string diff --git a/pkg/common/event/handshake_event.go b/pkg/common/event/handshake_event.go index d0567d1aa..3effec82f 100644 --- a/pkg/common/event/handshake_event.go +++ b/pkg/common/event/handshake_event.go @@ -81,6 +81,10 @@ func (e *HandshakeEvent) IsPaused() bool { return e.State.IsPaused() } +func (e *HandshakeEvent) Len() int32 { + return 0 +} + func (e HandshakeEvent) Marshal() ([]byte, error) { return e.encode() } diff --git a/pkg/common/event/interface.go b/pkg/common/event/interface.go index 74a7be353..ca9c00680 100644 --- a/pkg/common/event/interface.go +++ b/pkg/common/event/interface.go @@ -29,6 +29,8 @@ type Event interface { // It's used for memory control and monitoring. GetSize() int64 IsPaused() bool + // GetLen returns the number of rows in the event. + Len() int32 } // FlushableEvent is an event that can be flushed to downstream by a dispatcher. diff --git a/pkg/common/event/ready_event.go b/pkg/common/event/ready_event.go index cccd71dbd..6543f42b4 100644 --- a/pkg/common/event/ready_event.go +++ b/pkg/common/event/ready_event.go @@ -73,6 +73,10 @@ func (e *ReadyEvent) IsPaused() bool { return false } +func (e *ReadyEvent) Len() int32 { + return 0 +} + func (e ReadyEvent) Marshal() ([]byte, error) { return e.encode() } diff --git a/pkg/common/event/resolved_ts_event.go b/pkg/common/event/resolved_ts_event.go index 7948657d1..6667d8d21 100644 --- a/pkg/common/event/resolved_ts_event.go +++ b/pkg/common/event/resolved_ts_event.go @@ -52,6 +52,11 @@ func (b *BatchResolvedEvent) GetSeq() uint64 { return 0 } +func (b *BatchResolvedEvent) Len() int32 { + // Return the length of events. + return int32(len(b.Events)) +} + func (b *BatchResolvedEvent) Marshal() ([]byte, error) { if len(b.Events) == 0 { return nil, nil @@ -136,6 +141,10 @@ func (e ResolvedEvent) GetSeq() uint64 { return 0 } +func (e ResolvedEvent) Len() int32 { + return 1 +} + func (e ResolvedEvent) Marshal() ([]byte, error) { return e.encode() } diff --git a/pkg/common/event/sync_point_event.go b/pkg/common/event/sync_point_event.go index 4dcc80aee..8312e06a0 100644 --- a/pkg/common/event/sync_point_event.go +++ b/pkg/common/event/sync_point_event.go @@ -102,3 +102,7 @@ func (e *SyncPointEvent) PushFrontFlushFunc(f func()) { func (e *SyncPointEvent) ClearPostFlushFunc() { e.PostTxnFlushed = e.PostTxnFlushed[:0] } + +func (e *SyncPointEvent) Len() int32 { + return 0 +}