Skip to content

Commit

Permalink
metrics: unify the metrics in dataflow to rows/s (#1010)
Browse files Browse the repository at this point in the history
  • Loading branch information
hongyunyan authored Feb 18, 2025
1 parent 60a2dbb commit 6cd203d
Show file tree
Hide file tree
Showing 9 changed files with 37 additions and 10 deletions.
4 changes: 2 additions & 2 deletions downstreamadapter/eventcollector/event_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -432,9 +432,9 @@ func (c *EventCollector) runProcessMessage(ctx context.Context, inCh <-chan *mes
event.Event = e
c.ds.Push(e.DispatcherID, event)
}
c.metricDispatcherReceivedResolvedTsEventCount.Add(float64(len(events)))
c.metricDispatcherReceivedResolvedTsEventCount.Add(float64(event.Len()))
default:
c.metricDispatcherReceivedKVEventCount.Inc()
c.metricDispatcherReceivedKVEventCount.Add(float64(event.Len()))
c.ds.Push(event.GetDispatcherID(), dispatcher.NewDispatcherEvent(&targetMessage.From, event))
}
default:
Expand Down
2 changes: 1 addition & 1 deletion maintainer/replica/checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ func (s *hotSpanChecker) UpdateStatus(span *SpanReplication) {
return
}

log.Debug("hotSpanChecker EventSizePerSecond", zap.Any("span", span.Span), zap.Any("dispatcher", span.ID), zap.Any("EventSizePerSecond", status.EventSizePerSecond), zap.Any("writeThreshold", s.writeThreshold))
log.Debug("hotSpanChecker EventSizePerSecond", zap.Any("changefeed", span.ChangefeedID.Name()), zap.Any("span", span.Span), zap.Any("dispatcher", span.ID), zap.Any("EventSizePerSecond", status.EventSizePerSecond), zap.Any("writeThreshold", s.writeThreshold))

if status.EventSizePerSecond != 0 && status.EventSizePerSecond < s.writeThreshold {
if hotSpan, ok := s.hotTasks[span.ID]; ok {
Expand Down
14 changes: 7 additions & 7 deletions metrics/grafana/ticdc_new_arch.json
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,7 @@
"timeFrom": null,
"timeRegions": [],
"timeShift": null,
"title": "Sink Event Count / s",
"title": "Sink Event Row Count / s",
"tooltip": {
"shared": true,
"sort": 2,
Expand Down Expand Up @@ -1666,7 +1666,7 @@
"timeFrom": null,
"timeRegions": [],
"timeShift": null,
"title": "Puller output events",
"title": "Puller output event rows",
"tooltip": {
"shared": true,
"sort": 0,
Expand Down Expand Up @@ -1769,7 +1769,7 @@
"timeFrom": null,
"timeRegions": [],
"timeShift": null,
"title": "EventService output events / s",
"title": "EventService output event row / s",
"tooltip": {
"shared": true,
"sort": 0,
Expand Down Expand Up @@ -1874,7 +1874,7 @@
"timeFrom": null,
"timeRegions": [],
"timeShift": null,
"title": "EventService output events",
"title": "EventService output event rows",
"tooltip": {
"shared": true,
"sort": 0,
Expand Down Expand Up @@ -1978,7 +1978,7 @@
"timeFrom": null,
"timeRegions": [],
"timeShift": null,
"title": "Event Collector received event / s",
"title": "Event Collector received event rows / s",
"tooltip": {
"shared": true,
"sort": 2,
Expand Down Expand Up @@ -2081,7 +2081,7 @@
"timeFrom": null,
"timeRegions": [],
"timeShift": null,
"title": "Event collector Received events",
"title": "Event collector Received event rows",
"tooltip": {
"shared": true,
"sort": 0,
Expand Down Expand Up @@ -16691,7 +16691,7 @@
"timeFrom": null,
"timeRegions": [],
"timeShift": null,
"title": "Puller output events / s",
"title": "Puller output event rows / s",
"tooltip": {
"shared": true,
"sort": 0,
Expand Down
4 changes: 4 additions & 0 deletions pkg/common/event/ddl_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,10 @@ func (t *DDLEvent) IsPaused() bool {
return t.State.IsPaused()
}

func (t *DDLEvent) Len() int32 {
return 1
}

type SchemaTableName struct {
SchemaName string
TableName string
Expand Down
4 changes: 4 additions & 0 deletions pkg/common/event/handshake_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,10 @@ func (e *HandshakeEvent) IsPaused() bool {
return e.State.IsPaused()
}

func (e *HandshakeEvent) Len() int32 {
return 0
}

func (e HandshakeEvent) Marshal() ([]byte, error) {
return e.encode()
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/common/event/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ type Event interface {
// It's used for memory control and monitoring.
GetSize() int64
IsPaused() bool
// GetLen returns the number of rows in the event.
Len() int32
}

// FlushableEvent is an event that can be flushed to downstream by a dispatcher.
Expand Down
4 changes: 4 additions & 0 deletions pkg/common/event/ready_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,10 @@ func (e *ReadyEvent) IsPaused() bool {
return false
}

func (e *ReadyEvent) Len() int32 {
return 0
}

func (e ReadyEvent) Marshal() ([]byte, error) {
return e.encode()
}
Expand Down
9 changes: 9 additions & 0 deletions pkg/common/event/resolved_ts_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,11 @@ func (b *BatchResolvedEvent) GetSeq() uint64 {
return 0
}

func (b *BatchResolvedEvent) Len() int32 {
// Return the length of events.
return int32(len(b.Events))
}

func (b *BatchResolvedEvent) Marshal() ([]byte, error) {
if len(b.Events) == 0 {
return nil, nil
Expand Down Expand Up @@ -136,6 +141,10 @@ func (e ResolvedEvent) GetSeq() uint64 {
return 0
}

func (e ResolvedEvent) Len() int32 {
return 1
}

func (e ResolvedEvent) Marshal() ([]byte, error) {
return e.encode()
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/common/event/sync_point_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,3 +102,7 @@ func (e *SyncPointEvent) PushFrontFlushFunc(f func()) {
func (e *SyncPointEvent) ClearPostFlushFunc() {
e.PostTxnFlushed = e.PostTxnFlushed[:0]
}

func (e *SyncPointEvent) Len() int32 {
return 0
}

0 comments on commit 6cd203d

Please sign in to comment.