Skip to content

Commit

Permalink
vtorc: Switch to Vitess stats (vitessio#15948) (#539)
Browse files Browse the repository at this point in the history
Signed-off-by: Dirkjan Bussink <d.bussink@gmail.com>
Co-authored-by: Dirkjan Bussink <d.bussink@gmail.com>
  • Loading branch information
timvaillancourt and dbussink authored Oct 23, 2024
1 parent 33a4559 commit c6ee558
Show file tree
Hide file tree
Showing 7 changed files with 32 additions and 60 deletions.
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ require (
github.com/planetscale/vtprotobuf v0.5.0
github.com/prometheus/client_golang v1.19.0
github.com/prometheus/common v0.49.0 // indirect
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475
github.com/sjmudd/stopwatch v0.1.1
github.com/soheilhy/cmux v0.1.5
github.com/spf13/cobra v1.8.0
Expand Down
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -422,8 +422,6 @@ github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsT
github.com/prometheus/procfs v0.0.8/go.mod h1:7Qr8sr6344vo1JqZ6HhLceV9o3AJ1Ff+GxbHq6oeK9A=
github.com/prometheus/procfs v0.12.0 h1:jluTpSng7V9hY0O2R9DzzJHYb2xULk9VTR1V1R/k6Bo=
github.com/prometheus/procfs v0.12.0/go.mod h1:pcuDEFsWDnvcgNzo4EEweacyhjeA9Zk3cnaOZAZEfOo=
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 h1:N/ElC8H3+5XpJzTSTfLsJV/mx9Q9g7kxmchpfZyxgzM=
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94icq4NjY3clb7Lk8O1qJ8BdBEF8z0ibU0rE=
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo=
github.com/richardartoul/molecule v1.0.1-0.20221107223329-32cfee06a052 h1:Qp27Idfgi6ACvFQat5+VJvlYToylpM/hcyLBI3WaKPA=
Expand Down
18 changes: 7 additions & 11 deletions go/vt/vtorc/inst/analysis_dao.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,31 +20,27 @@ import (
"fmt"
"time"

"vitess.io/vitess/go/vt/external/golib/sqlutils"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/topo/topoproto"

"github.com/patrickmn/go-cache"
"google.golang.org/protobuf/encoding/prototext"

"vitess.io/vitess/go/stats"
"vitess.io/vitess/go/vt/external/golib/sqlutils"
"vitess.io/vitess/go/vt/log"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/topo/topoproto"
"vitess.io/vitess/go/vt/vtctl/reparentutil"
"vitess.io/vitess/go/vt/vtorc/config"
"vitess.io/vitess/go/vt/vtorc/db"
"vitess.io/vitess/go/vt/vtorc/process"
"vitess.io/vitess/go/vt/vtorc/util"

"github.com/patrickmn/go-cache"
"github.com/rcrowley/go-metrics"
)

var analysisChangeWriteCounter = metrics.NewCounter()
var analysisChangeWriteCounter = stats.NewCounter("analysis.change.write", "Number of times analysis has changed")

var recentInstantAnalysis *cache.Cache

func init() {
_ = metrics.Register("analysis.change.write", analysisChangeWriteCounter)

go initializeAnalysisDaoPostConfiguration()
}

Expand Down Expand Up @@ -748,7 +744,7 @@ func auditInstanceAnalysisInChangelog(tabletAlias string, analysisCode AnalysisC
tabletAlias, string(analysisCode),
)
if err == nil {
analysisChangeWriteCounter.Inc(1)
analysisChangeWriteCounter.Add(1)
} else {
log.Error(err)
}
Expand Down
7 changes: 3 additions & 4 deletions go/vt/vtorc/inst/analysis_dao_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"time"

"github.com/patrickmn/go-cache"
"github.com/rcrowley/go-metrics"
"github.com/stretchr/testify/require"

"vitess.io/vitess/go/vt/external/golib/sqlutils"
Expand Down Expand Up @@ -864,7 +863,7 @@ func TestAuditInstanceAnalysisInChangelog(t *testing.T) {
oldAnalysisChangeWriteCounter := analysisChangeWriteCounter

recentInstantAnalysis = cache.New(tt.cacheExpiration, 100*time.Millisecond)
analysisChangeWriteCounter = metrics.NewCounter()
before := analysisChangeWriteCounter.Get()

defer func() {
// Set the old values back.
Expand All @@ -877,7 +876,7 @@ func TestAuditInstanceAnalysisInChangelog(t *testing.T) {
updates := []struct {
tabletAlias string
analysisCode AnalysisCode
writeCounterExpectation int
writeCounterExpectation int64
wantErr string
}{
{
Expand Down Expand Up @@ -908,7 +907,7 @@ func TestAuditInstanceAnalysisInChangelog(t *testing.T) {
continue
}
require.NoError(t, err)
require.EqualValues(t, upd.writeCounterExpectation, analysisChangeWriteCounter.Count())
require.EqualValues(t, upd.writeCounterExpectation, analysisChangeWriteCounter.Get()-before)
}
})
}
Expand Down
11 changes: 3 additions & 8 deletions go/vt/vtorc/inst/audit_dao.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,13 @@ import (
"os"
"time"

"github.com/rcrowley/go-metrics"

"vitess.io/vitess/go/stats"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/vtorc/config"
"vitess.io/vitess/go/vt/vtorc/db"
)

var auditOperationCounter = metrics.NewCounter()

func init() {
_ = metrics.Register("audit.write", auditOperationCounter)
}
var auditOperationCounter = stats.NewCounter("audit.write", "Number of audit operations performed")

// AuditOperation creates and writes a new audit entry by given params
func AuditOperation(auditType string, tabletAlias string, message string) error {
Expand Down Expand Up @@ -86,7 +81,7 @@ func AuditOperation(auditType string, tabletAlias string, message string) error
if !auditWrittenToFile {
log.Infof(logMessage)
}
auditOperationCounter.Inc(1)
auditOperationCounter.Add(1)

return nil
}
Expand Down
15 changes: 4 additions & 11 deletions go/vt/vtorc/inst/instance_dao.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"time"

"github.com/patrickmn/go-cache"
"github.com/rcrowley/go-metrics"
"github.com/sjmudd/stopwatch"

"vitess.io/vitess/go/mysql/replication"
Expand Down Expand Up @@ -59,10 +58,8 @@ var (
var forgetAliases *cache.Cache

var (
accessDeniedCounter = metrics.NewCounter()
readTopologyInstanceCounter = metrics.NewCounter()
readInstanceCounter = metrics.NewCounter()
writeInstanceCounter = metrics.NewCounter()
readTopologyInstanceCounter = stats.NewCounter("instance.read_topology", "Number of times an instance was read from the topology")
readInstanceCounter = stats.NewCounter("instance.read", "Number of times an instance was read")
backendWrites = collection.CreateOrReturnCollection("BACKEND_WRITES")
writeBufferLatency = stopwatch.NewNamedStopwatch()
)
Expand All @@ -73,10 +70,6 @@ var (
)

func init() {
_ = metrics.Register("instance.access_denied", accessDeniedCounter)
_ = metrics.Register("instance.read_topology", readTopologyInstanceCounter)
_ = metrics.Register("instance.read", readInstanceCounter)
_ = metrics.Register("instance.write", writeInstanceCounter)
_ = writeBufferLatency.AddMany([]string{"wait", "write"})
writeBufferLatency.Start("wait")

Expand Down Expand Up @@ -389,7 +382,7 @@ Cleanup:
}

latency.Stop("instance")
readTopologyInstanceCounter.Inc(1)
readTopologyInstanceCounter.Add(1)

if instanceFound {
instance.LastDiscoveryLatency = time.Since(readingStartTime)
Expand Down Expand Up @@ -618,7 +611,7 @@ func ReadInstance(tabletAlias string) (*Instance, bool, error) {
instances, err := readInstancesByCondition(condition, sqlutils.Args(tabletAlias), "")
// We know there will be at most one (alias is the PK).
// And we expect to find one.
readInstanceCounter.Inc(1)
readInstanceCounter.Add(1)
if len(instances) == 0 {
return nil, false, err
}
Expand Down
38 changes: 15 additions & 23 deletions go/vt/vtorc/logic/vtorc.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ import (
"time"

"github.com/patrickmn/go-cache"
"github.com/rcrowley/go-metrics"
"github.com/sjmudd/stopwatch"

"vitess.io/vitess/go/stats"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/servenv"
"vitess.io/vitess/go/vt/vtorc/collection"
Expand All @@ -51,14 +51,14 @@ var snapshotDiscoveryKeys chan string
var snapshotDiscoveryKeysMutex sync.Mutex
var hasReceivedSIGTERM int32

var discoveriesCounter = metrics.NewCounter()
var failedDiscoveriesCounter = metrics.NewCounter()
var instancePollSecondsExceededCounter = metrics.NewCounter()
var discoveryQueueLengthGauge = metrics.NewGauge()
var discoveryRecentCountGauge = metrics.NewGauge()
var isElectedGauge = metrics.NewGauge()
var isHealthyGauge = metrics.NewGauge()
var discoveriesCounter = stats.NewCounter("discoveries.attempt", "Number of discoveries attempted")
var failedDiscoveriesCounter = stats.NewCounter("discoveries.fail", "Number of failed discoveries")
var instancePollSecondsExceededCounter = stats.NewCounter("discoveries.instance_poll_seconds_exceeded", "Number of instances that took longer than InstancePollSeconds to poll")
var discoveryQueueLengthGauge = stats.NewGauge("discoveries.queue_length", "Length of the discovery queue")
var discoveryRecentCountGauge = stats.NewGauge("discoveries.recent_count", "Number of recent discoveries")
var discoveryMetrics = collection.CreateOrReturnCollection(DiscoveryMetricsName)
var isElectedGauge = stats.NewGauge("elect.is_elected", "Elected state")
var isHealthyGauge = stats.NewGauge("health.is_healthy", "Healthy state")

var isElectedNode int64

Expand All @@ -67,28 +67,20 @@ var recentDiscoveryOperationKeys *cache.Cache
func init() {
snapshotDiscoveryKeys = make(chan string, 10)

_ = metrics.Register("discoveries.attempt", discoveriesCounter)
_ = metrics.Register("discoveries.fail", failedDiscoveriesCounter)
_ = metrics.Register("discoveries.instance_poll_seconds_exceeded", instancePollSecondsExceededCounter)
_ = metrics.Register("discoveries.queue_length", discoveryQueueLengthGauge)
_ = metrics.Register("discoveries.recent_count", discoveryRecentCountGauge)
_ = metrics.Register("elect.is_elected", isElectedGauge)
_ = metrics.Register("health.is_healthy", isHealthyGauge)

ometrics.OnMetricsTick(func() {
discoveryQueueLengthGauge.Update(int64(discoveryQueue.QueueLen()))
discoveryQueueLengthGauge.Set(int64(discoveryQueue.QueueLen()))
})
ometrics.OnMetricsTick(func() {
if recentDiscoveryOperationKeys == nil {
return
}
discoveryRecentCountGauge.Update(int64(recentDiscoveryOperationKeys.ItemCount()))
discoveryRecentCountGauge.Set(int64(recentDiscoveryOperationKeys.ItemCount()))
})
ometrics.OnMetricsTick(func() {
isElectedGauge.Update(atomic.LoadInt64(&isElectedNode))
isElectedGauge.Set(atomic.LoadInt64(&isElectedNode))
})
ometrics.OnMetricsTick(func() {
isHealthyGauge.Update(atomic.LoadInt64(&process.LastContinousCheckHealthy))
isHealthyGauge.Set(atomic.LoadInt64(&process.LastContinousCheckHealthy))
})
}

Expand Down Expand Up @@ -197,7 +189,7 @@ func DiscoverInstance(tabletAlias string, forceDiscovery bool) {
latency.Stop("total")
discoveryTime := latency.Elapsed("total")
if discoveryTime > instancePollSecondsDuration() {
instancePollSecondsExceededCounter.Inc(1)
instancePollSecondsExceededCounter.Add(1)
log.Warningf("discoverInstance exceeded InstancePollSeconds for %+v, took %.4fs", tabletAlias, discoveryTime.Seconds())
if metric != nil {
metric.InstancePollSecondsDurationCount = 1
Expand Down Expand Up @@ -225,7 +217,7 @@ func DiscoverInstance(tabletAlias string, forceDiscovery bool) {
return
}

discoveriesCounter.Inc(1)
discoveriesCounter.Add(1)

// First we've ever heard of this instance. Continue investigation:
instance, err := inst.ReadTopologyInstanceBufferable(tabletAlias, latency)
Expand All @@ -240,7 +232,7 @@ func DiscoverInstance(tabletAlias string, forceDiscovery bool) {
}

if instance == nil {
failedDiscoveriesCounter.Inc(1)
failedDiscoveriesCounter.Add(1)
metric = &discovery.Metric{
Timestamp: time.Now(),
TabletAlias: tabletAlias,
Expand Down

0 comments on commit c6ee558

Please sign in to comment.