Skip to content

Commit

Permalink
maintainer: Add scheduler metrics (#127)
Browse files Browse the repository at this point in the history
* add metrics

* fix

* bump local chann
  • Loading branch information
CharlesCheung96 authored Jul 25, 2024
1 parent 820795f commit eba9469
Show file tree
Hide file tree
Showing 7 changed files with 358 additions and 22 deletions.
7 changes: 7 additions & 0 deletions coordinator/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
appcontext "github.com/flowbehappy/tigate/pkg/common/context"
"github.com/flowbehappy/tigate/pkg/common/server"
"github.com/flowbehappy/tigate/pkg/messaging"
"github.com/flowbehappy/tigate/pkg/metrics"
"github.com/flowbehappy/tigate/rpc"
"github.com/flowbehappy/tigate/scheduler"
"github.com/flowbehappy/tigate/utils"
Expand Down Expand Up @@ -52,6 +53,7 @@ type coordinator struct {
lastState *orchestrator.GlobalReactorState

lastSaveTime time.Time
lastTickTime time.Time
scheduledChangefeeds map[model.ChangeFeedID]*changefeed
}

Expand All @@ -60,6 +62,7 @@ func NewCoordinator(capture *model.CaptureInfo, version int64) server.Coordinato
version: version,
nodeInfo: capture,
scheduledChangefeeds: make(map[model.ChangeFeedID]*changefeed),
lastTickTime: time.Now(),
}
c.supervisor = scheduler.NewSupervisor(
scheduler.ChangefeedID(model.DefaultChangeFeedID("coordinator")),
Expand Down Expand Up @@ -91,6 +94,10 @@ func (c *coordinator) Tick(
state := rawState.(*orchestrator.GlobalReactorState)
c.lastState = state

now := time.Now()
metrics.CoordinatorCounter.Add(float64(now.Sub(c.lastTickTime)) / float64(time.Second))
c.lastTickTime = now

// 1. handle grpc messages
err := c.handleMessages()
if err != nil {
Expand Down
67 changes: 46 additions & 21 deletions maintainer/maintainer.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
appcontext "github.com/flowbehappy/tigate/pkg/common/context"
appctx "github.com/flowbehappy/tigate/pkg/common/context"
"github.com/flowbehappy/tigate/pkg/messaging"
"github.com/flowbehappy/tigate/pkg/metrics"
"github.com/flowbehappy/tigate/rpc"
"github.com/flowbehappy/tigate/scheduler"
"github.com/flowbehappy/tigate/server/watcher"
Expand All @@ -38,6 +39,8 @@ import (
"github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/filter"
"github.com/pingcap/tiflow/pkg/spanz"
"github.com/prometheus/client_golang/prometheus"
"github.com/tikv/client-go/v2/oracle"
"go.uber.org/atomic"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -87,6 +90,13 @@ type Maintainer struct {
errLock sync.Mutex
runningErrors map[messaging.ServerId]*heartbeatpb.RunningError
runningWarnings map[messaging.ServerId]*heartbeatpb.RunningError

changefeedCheckpointTsGauge prometheus.Gauge
changefeedCheckpointTsLagGauge prometheus.Gauge
changefeedStatusGauge prometheus.Gauge
scheduleredTaskGuage prometheus.Gauge
runningTaskGuage prometheus.Gauge
tableCountGauge prometheus.Gauge
}

// NewMaintainer create the maintainer for the changefeed
Expand Down Expand Up @@ -115,6 +125,13 @@ func NewMaintainer(cfID model.ChangeFeedID,
msgBuf: make([]*messaging.TargetMessage, 1024),
runningErrors: map[messaging.ServerId]*heartbeatpb.RunningError{},
runningWarnings: map[messaging.ServerId]*heartbeatpb.RunningError{},

changefeedCheckpointTsGauge: metrics.ChangefeedCheckpointTsGauge.WithLabelValues(cfID.Namespace, cfID.ID),
changefeedCheckpointTsLagGauge: metrics.ChangefeedCheckpointTsLagGauge.WithLabelValues(cfID.Namespace, cfID.ID),
changefeedStatusGauge: metrics.ChangefeedStatusGauge.WithLabelValues(cfID.Namespace, cfID.ID),
scheduleredTaskGuage: metrics.ScheduleTaskGuage.WithLabelValues(cfID.Namespace, cfID.ID),
runningTaskGuage: metrics.RunningScheduleTaskGauge.WithLabelValues(cfID.Namespace, cfID.ID),
tableCountGauge: metrics.TableGauge.WithLabelValues(cfID.Namespace, cfID.ID),
}
if !isSecondary {
m.state = heartbeatpb.ComponentState_Working
Expand All @@ -129,6 +146,7 @@ func NewMaintainer(cfID model.ChangeFeedID,
}

func (m *Maintainer) Execute() (taskStatus threadpool.TaskStatus, tick time.Time) {
m.updateMetrics()
if m.removed.Load() {
// removed, cancel the task
return threadpool.Done, time.Time{}
Expand Down Expand Up @@ -254,6 +272,16 @@ func (m *Maintainer) calCheckpointTs() {
}
}

func (m *Maintainer) updateMetrics() {
phyCkpTs := oracle.ExtractPhysical(m.checkpointTs.Load())
m.changefeedCheckpointTsGauge.Set(float64(phyCkpTs))

lag := oracle.GetPhysical(time.Now()) - phyCkpTs
m.changefeedCheckpointTsLagGauge.Set(float64(lag))

m.changefeedStatusGauge.Set(float64(m.state))
}

// send message to remote, todo: use a io thread pool
func (m *Maintainer) sendMessages(msgs []rpc.Message) {
for _, msg := range msgs {
Expand Down Expand Up @@ -516,40 +544,37 @@ func (m *Maintainer) getMessageQueue() *MessageQueue {

func (m *Maintainer) printStatus() {
if time.Since(m.lastPrintStatusTime) > time.Second*120 {
workingTask := 0
prepareTask := 0
absentTask := 0
commitTask := 0
removingTask := 0
tableStates := make(map[scheduler.SchedulerStatus]int)
for i := 0; i <= 5; i++ {
tableStates[scheduler.SchedulerStatus(i)] = 0
}
// var taskDistribution string
m.supervisor.StateMachines.Ascend(func(key scheduler.InferiorID, value *scheduler.StateMachine) bool {
switch value.State {
case scheduler.SchedulerStatusAbsent:
absentTask++
case scheduler.SchedulerStatusPrepare:
prepareTask++
case scheduler.SchedulerStatusCommit:
commitTask++
case scheduler.SchedulerStatusWorking:
workingTask++
case scheduler.SchedulerStatusRemoving:
removingTask++
if _, ok := tableStates[value.State]; !ok {
tableStates[value.State] = 0
}
tableStates[value.State]++
// span := key.(*common.TableSpan)
// taskDistribution = fmt.Sprintf("%s, %d==>%s", taskDistribution, span.TableID, value.Primary)
return true
})

m.tableCountGauge.Set(float64(m.tableSpans.Len()))
m.scheduleredTaskGuage.Set(float64(m.supervisor.GetInferiors().Len()))
for state, count := range tableStates {
metrics.TableStateGauge.WithLabelValues(m.id.Namespace, m.id.ID, state.String()).Set(float64(count))
}

log.Info("table span status",
// zap.String("distribution", taskDistribution),
zap.String("changefeed", m.id.ID),
zap.Int("total", m.tableSpans.Len()),
zap.Int("scheduled", m.supervisor.GetInferiors().Len()),
zap.Int("absent", absentTask),
zap.Int("prepare", prepareTask),
zap.Int("commit", commitTask),
zap.Int("working", workingTask),
zap.Int("removing", removingTask),
zap.Int("absent", tableStates[scheduler.SchedulerStatusAbsent]),
zap.Int("prepare", tableStates[scheduler.SchedulerStatusPrepare]),
zap.Int("commit", tableStates[scheduler.SchedulerStatusCommit]),
zap.Int("working", tableStates[scheduler.SchedulerStatusWorking]),
zap.Int("removing", tableStates[scheduler.SchedulerStatusRemoving]),
zap.Int("runningTasks", m.supervisor.RunningTasks.Len()))
m.lastPrintStatusTime = time.Now()
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/config/messging.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package config

const (
// size of channel to cache the messages to be sent and received
defaultCacheSize = 1024
defaultCacheSize = 102400
)

type MessageCenterConfig struct {
Expand Down
105 changes: 105 additions & 0 deletions pkg/metrics/changefeed.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package metrics

import (
"github.com/prometheus/client_golang/prometheus"
)

var (
ChangefeedBarrierTsGauge = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "ticdc",
Subsystem: "owner",
Name: "barrier_ts",
Help: "barrier ts of changefeeds",
}, []string{"namespace", "changefeed"})

ChangefeedCheckpointTsGauge = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "ticdc",
Subsystem: "owner",
Name: "checkpoint_ts",
Help: "checkpoint ts of changefeeds",
}, []string{"namespace", "changefeed"})

ChangefeedCheckpointTsLagGauge = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "ticdc",
Subsystem: "owner",
Name: "checkpoint_ts_lag",
Help: "checkpoint ts lag of changefeeds in seconds",
}, []string{"namespace", "changefeed"})

CurrentPDTsGauge = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "ticdc",
Subsystem: "owner",
Name: "current_pd_ts",
Help: "The current PD ts",
}, []string{"namespace", "changefeed"})

ChangefeedResolvedTsGauge = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "ticdc",
Subsystem: "owner",
Name: "resolved_ts",
Help: "resolved ts of changefeeds",
}, []string{"namespace", "changefeed"})
ChangefeedResolvedTsLagGauge = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "ticdc",
Subsystem: "owner",
Name: "resolved_ts_lag",
Help: "resolved ts lag of changefeeds in seconds",
}, []string{"namespace", "changefeed"})

CoordinatorCounter = prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: "ticdc",
Subsystem: "owner",
Name: "ownership_counter",
Help: "The counter of ownership increases every 5 seconds on a owner capture",
})

ChangefeedStatusGauge = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "ticdc",
Subsystem: "owner",
Name: "status",
Help: "The status of changefeeds",
}, []string{"namespace", "changefeed"})

ChangefeedTickDuration = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "ticdc",
Subsystem: "owner",
Name: "changefeed_tick_duration",
Help: "Bucketed histogram of owner tick changefeed reactor time (s).",
Buckets: prometheus.ExponentialBuckets(0.01 /* 10 ms */, 2, 18),
}, []string{"namespace", "changefeed"})
)

// InitMetrics registers all metrics used in owner
func InitChangefeedMetrics(registry *prometheus.Registry) {
registry.MustRegister(ChangefeedBarrierTsGauge)
registry.MustRegister(ChangefeedCheckpointTsGauge)
registry.MustRegister(ChangefeedCheckpointTsLagGauge)
registry.MustRegister(ChangefeedResolvedTsGauge)
registry.MustRegister(ChangefeedResolvedTsLagGauge)
registry.MustRegister(CurrentPDTsGauge)
registry.MustRegister(CoordinatorCounter)
registry.MustRegister(ChangefeedStatusGauge)
registry.MustRegister(ChangefeedTickDuration)
}
21 changes: 21 additions & 0 deletions pkg/metrics/init.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package metrics

import "github.com/prometheus/client_golang/prometheus"

func InitMetrics(registry *prometheus.Registry) {
InitSchedulerMetrics(registry)
InitChangefeedMetrics(registry)
}
Loading

0 comments on commit eba9469

Please sign in to comment.