From 3805ead385cbd7f3bf57d41facefbe0fe2e848c7 Mon Sep 17 00:00:00 2001 From: cskh Date: Mon, 8 Aug 2022 16:55:54 -0400 Subject: [PATCH] Add metrics labels (#658) * Add labels to metrics * upgrade memberlist to 0.4.0 Co-authored-by: R.B. Boyer --- coordinate/client.go | 2 +- coordinate/config.go | 7 +++++++ go.mod | 2 +- go.sum | 7 ++++--- serf/config.go | 4 ++++ serf/delegate.go | 8 ++++---- serf/ping_delegate.go | 4 ++-- serf/serf.go | 37 +++++++++++++++++++++++-------------- serf/snapshot.go | 5 +++-- 9 files changed, 49 insertions(+), 27 deletions(-) diff --git a/coordinate/client.go b/coordinate/client.go index 3582ee4da..32124a73a 100644 --- a/coordinate/client.go +++ b/coordinate/client.go @@ -218,7 +218,7 @@ func (c *Client) Update(node string, other *Coordinate, rtt time.Duration) (*Coo return nil, fmt.Errorf("round trip time not in valid range, duration %v is not a positive value less than %v ", rtt, maxRTT) } if rtt == 0 { - metrics.IncrCounter([]string{"serf", "coordinate", "zero-rtt"}, 1) + metrics.IncrCounterWithLabels([]string{"serf", "coordinate", "zero-rtt"}, 1, c.config.MetricLabels) } rttSeconds := c.latencyFilter(node, rtt.Seconds()) diff --git a/coordinate/config.go b/coordinate/config.go index b85a8ab7b..09c0cafe8 100644 --- a/coordinate/config.go +++ b/coordinate/config.go @@ -1,5 +1,9 @@ package coordinate +import ( + "github.com/armon/go-metrics" +) + // Config is used to set the parameters of the Vivaldi-based coordinate mapping // algorithm. // @@ -52,6 +56,9 @@ type Config struct { // GravityRho is a tuning factor that sets how much gravity has an effect // to try to re-center coordinates. See [2] for more details. GravityRho float64 + + // metricLabels is the slice of labels to put on all emitted metrics + MetricLabels []metrics.Label } // DefaultConfig returns a Config that has some default values suitable for diff --git a/go.mod b/go.mod index 976c4008a..ebd0d97c8 100644 --- a/go.mod +++ b/go.mod @@ -13,7 +13,7 @@ require ( github.com/hashicorp/go-uuid v1.0.1 // indirect github.com/hashicorp/logutils v1.0.0 github.com/hashicorp/mdns v1.0.4 - github.com/hashicorp/memberlist v0.3.0 + github.com/hashicorp/memberlist v0.4.0 github.com/mattn/go-colorable v0.1.6 // indirect github.com/mitchellh/cli v1.1.0 github.com/mitchellh/mapstructure v0.0.0-20160808181253-ca63d7c062ee diff --git a/go.sum b/go.sum index 281ed7761..8c6af7645 100644 --- a/go.sum +++ b/go.sum @@ -37,8 +37,8 @@ github.com/hashicorp/logutils v1.0.0 h1:dLEQVugN8vlakKOUE3ihGLTZJRB4j+M2cdTm/ORI github.com/hashicorp/logutils v1.0.0/go.mod h1:QIAnNjmIWmVIIkWDTG1z5v++HQmx9WQRO+LraFDTW64= github.com/hashicorp/mdns v1.0.4 h1:sY0CMhFmjIPDMlTB+HfymFHCaYLhgifZ0QhjaYKD/UQ= github.com/hashicorp/mdns v1.0.4/go.mod h1:mtBihi+LeNXGtG8L9dX59gAEa12BDtBQSp4v/YAJqrc= -github.com/hashicorp/memberlist v0.3.0 h1:8+567mCcFDnS5ADl7lrpxPMWiFCElyUEeW0gtj34fMA= -github.com/hashicorp/memberlist v0.3.0/go.mod h1:MS2lj3INKhZjWNqd3N0m3J+Jxf3DAOnAH9VT3Sh9MUE= +github.com/hashicorp/memberlist v0.4.0 h1:k3uda5gZcltmafuFF+UFqNEl5PrH+yPZ4zkjp1f/H/8= +github.com/hashicorp/memberlist v0.4.0/go.mod h1:yvyXLpo0QaGE59Y7hDTsTzDD25JYBZ4mHgHUZ8lrOI0= github.com/mattn/go-colorable v0.0.9/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaOChaDxuIBZU= github.com/mattn/go-colorable v0.1.4/go.mod h1:U0ppj6V5qS13XJ6of8GYAs25YV2eR4EVcfRqFIhoBtE= github.com/mattn/go-colorable v0.1.6 h1:6Su7aK7lXmJ/U79bYtBjLNaha4Fs1Rg9plHpcH+vvnE= @@ -91,8 +91,9 @@ golang.org/x/sys v0.0.0-20200116001909-b77594299b42/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20200223170610-d5e6a3e2c0ae/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210303074136-134d130e1a04/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44 h1:Bli41pIlzTzf3KEY06n+xnzK/BESIg2ze4Pgfh/aI8c= golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20220728004956-3c1f35247d10 h1:WIoqL4EROvwiPdUtaip4VcDdpZ4kha7wBWZrbVKCIZg= +golang.org/x/sys v0.0.0-20220728004956-3c1f35247d10/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= diff --git a/serf/config.go b/serf/config.go index 57b5a98e5..b71b36cac 100644 --- a/serf/config.go +++ b/serf/config.go @@ -6,6 +6,7 @@ import ( "os" "time" + "github.com/armon/go-metrics" "github.com/hashicorp/memberlist" ) @@ -262,6 +263,9 @@ type Config struct { // contain alphanumeric, dashes and '.'characters // and sets maximum length to 128 characters ValidateNodeNames bool + + // MetricLabels is a map of optional labels to apply to all metrics emitted. + MetricLabels []metrics.Label } // Init allocates the subdata structures diff --git a/serf/delegate.go b/serf/delegate.go index a6d23d116..41537396a 100644 --- a/serf/delegate.go +++ b/serf/delegate.go @@ -30,7 +30,7 @@ func (d *delegate) NotifyMsg(buf []byte) { if len(buf) == 0 { return } - metrics.AddSample([]string{"serf", "msgs", "received"}, float32(len(buf))) + metrics.AddSampleWithLabels([]string{"serf", "msgs", "received"}, float32(len(buf)), d.serf.metricLabels) rebroadcast := false rebroadcastQueue := d.serf.broadcasts @@ -142,7 +142,7 @@ func (d *delegate) GetBroadcasts(overhead, limit int) [][]byte { for _, msg := range msgs { lm := len(msg) bytesUsed += lm + overhead - metrics.AddSample([]string{"serf", "msgs", "sent"}, float32(lm)) + metrics.AddSampleWithLabels([]string{"serf", "msgs", "sent"}, float32(lm), d.serf.metricLabels) } // Get any additional query broadcasts @@ -151,7 +151,7 @@ func (d *delegate) GetBroadcasts(overhead, limit int) [][]byte { for _, m := range queryMsgs { lm := len(m) bytesUsed += lm + overhead - metrics.AddSample([]string{"serf", "msgs", "sent"}, float32(lm)) + metrics.AddSampleWithLabels([]string{"serf", "msgs", "sent"}, float32(lm), d.serf.metricLabels) } msgs = append(msgs, queryMsgs...) } @@ -162,7 +162,7 @@ func (d *delegate) GetBroadcasts(overhead, limit int) [][]byte { for _, m := range eventMsgs { lm := len(m) bytesUsed += lm + overhead - metrics.AddSample([]string{"serf", "msgs", "sent"}, float32(lm)) + metrics.AddSampleWithLabels([]string{"serf", "msgs", "sent"}, float32(lm), d.serf.keyManager.serf.metricLabels) } msgs = append(msgs, eventMsgs...) } diff --git a/serf/ping_delegate.go b/serf/ping_delegate.go index 98032c5be..63d4bdab7 100644 --- a/serf/ping_delegate.go +++ b/serf/ping_delegate.go @@ -68,7 +68,7 @@ func (p *pingDelegate) NotifyPingComplete(other *memberlist.Node, rtt time.Durat before := p.serf.coordClient.GetCoordinate() after, err := p.serf.coordClient.Update(other.Name, &coord, rtt) if err != nil { - metrics.IncrCounter([]string{"serf", "coordinate", "rejected"}, 1) + metrics.IncrCounterWithLabels([]string{"serf", "coordinate", "rejected"}, 1, p.serf.metricLabels) p.serf.logger.Printf("[TRACE] serf: Rejected coordinate from %s: %v\n", other.Name, err) return @@ -77,7 +77,7 @@ func (p *pingDelegate) NotifyPingComplete(other *memberlist.Node, rtt time.Durat // Publish some metrics to give us an idea of how much we are // adjusting each time we update. d := float32(before.DistanceTo(after).Seconds() * 1.0e3) - metrics.AddSample([]string{"serf", "coordinate", "adjustment-ms"}, d) + metrics.AddSampleWithLabels([]string{"serf", "coordinate", "adjustment-ms"}, d, p.serf.metricLabels) // Cache the coordinate for the other node, and add our own // to the cache as well since it just got updated. This lets diff --git a/serf/serf.go b/serf/serf.go index c9e56562d..51543ba83 100644 --- a/serf/serf.go +++ b/serf/serf.go @@ -105,6 +105,9 @@ type Serf struct { coordClient *coordinate.Client coordCache map[string]*coordinate.Coordinate coordCacheLock sync.RWMutex + + // metricLabels is the slice of labels to put on all emitted metrics + metricLabels []metrics.Label } // SerfState is the state of the Serf instance. @@ -270,6 +273,7 @@ func Create(conf *Config) (*Serf, error) { queryResponse: make(map[LamportTime]*QueryResponse), shutdownCh: make(chan struct{}), state: SerfAlive, + metricLabels: conf.MetricLabels, } serf.eventJoinIgnore.Store(false) @@ -313,7 +317,9 @@ func Create(conf *Config) (*Serf, error) { // Set up network coordinate client. if !conf.DisableCoordinates { - serf.coordClient, err = coordinate.NewClient(coordinate.DefaultConfig()) + coordinateConfig := coordinate.DefaultConfig() + coordinateConfig.MetricLabels = serf.metricLabels + serf.coordClient, err = coordinate.NewClient(coordinateConfig) if err != nil { return nil, fmt.Errorf("Failed to create coordinate client: %v", err) } @@ -334,6 +340,7 @@ func Create(conf *Config) (*Serf, error) { if err != nil { return nil, fmt.Errorf("Failed to setup snapshot: %v", err) } + snap.metricLabels = serf.metricLabels serf.snapshotter = snap conf.EventCh = eventCh prev = snap.AliveNodes() @@ -404,6 +411,8 @@ func Create(conf *Config) (*Serf, error) { conf.MemberlistConfig.Alive = md } + conf.MemberlistConfig.MetricLabels = conf.MetricLabels + // Create the underlying memberlist that will manage membership // and failure detection for the Serf instance. memberlist, err := memberlist.Create(conf.MemberlistConfig) @@ -953,7 +962,7 @@ func (s *Serf) handleNodeJoin(n *memberlist.Node) { oldStatus = member.Status deadTime := time.Now().Sub(member.leaveTime) if oldStatus == StatusFailed && deadTime < s.config.FlapTimeout { - metrics.IncrCounter([]string{"serf", "member", "flap"}, 1) + metrics.IncrCounterWithLabels([]string{"serf", "member", "flap"}, 1, s.metricLabels) } member.Status = StatusAlive @@ -980,7 +989,7 @@ func (s *Serf) handleNodeJoin(n *memberlist.Node) { } // Update some metrics - metrics.IncrCounter([]string{"serf", "member", "join"}, 1) + metrics.IncrCounterWithLabels([]string{"serf", "member", "join"}, 1, s.metricLabels) // Send an event along s.logger.Printf("[INFO] serf: EventMemberJoin: %s %s", @@ -1030,7 +1039,7 @@ func (s *Serf) handleNodeLeave(n *memberlist.Node) { } // Update some metrics - metrics.IncrCounter([]string{"serf", "member", member.Status.String()}, 1) + metrics.IncrCounterWithLabels([]string{"serf", "member", member.Status.String()}, 1, s.metricLabels) s.logger.Printf("[INFO] serf: %s: %s %s", eventStr, member.Member.Name, member.Member.Addr) @@ -1074,7 +1083,7 @@ func (s *Serf) handleNodeUpdate(n *memberlist.Node) { member.DelegateCur = n.DCur // Update some metrics - metrics.IncrCounter([]string{"serf", "member", "update"}, 1) + metrics.IncrCounterWithLabels([]string{"serf", "member", "update"}, 1, s.metricLabels) // Send an event along s.logger.Printf("[INFO] serf: EventMemberUpdate: %s", member.Member.Name) @@ -1272,8 +1281,8 @@ func (s *Serf) handleUserEvent(eventMsg *messageUserEvent) bool { seen.Events = append(seen.Events, userEvent) // Update some metrics - metrics.IncrCounter([]string{"serf", "events"}, 1) - metrics.IncrCounter([]string{"serf", "events", eventMsg.Name}, 1) + metrics.IncrCounterWithLabels([]string{"serf", "events"}, 1, s.metricLabels) + metrics.IncrCounterWithLabels([]string{"serf", "events", eventMsg.Name}, 1, s.metricLabels) if s.config.EventCh != nil { s.config.EventCh <- UserEvent{ @@ -1331,8 +1340,8 @@ func (s *Serf) handleQuery(query *messageQuery) bool { seen.QueryIDs = append(seen.QueryIDs, query.ID) // Update some metrics - metrics.IncrCounter([]string{"serf", "queries"}, 1) - metrics.IncrCounter([]string{"serf", "queries", query.Name}, 1) + metrics.IncrCounterWithLabels([]string{"serf", "queries"}, 1, s.metricLabels) + metrics.IncrCounterWithLabels([]string{"serf", "queries", query.Name}, 1, s.metricLabels) // Check if we should rebroadcast, this may be disabled by a flag rebroadcast := true @@ -1419,11 +1428,11 @@ func (s *Serf) handleQueryResponse(resp *messageQueryResponse) { if resp.Ack() { // Exit early if this is a duplicate ack if _, ok := query.acks[resp.From]; ok { - metrics.IncrCounter([]string{"serf", "query_duplicate_acks"}, 1) + metrics.IncrCounterWithLabels([]string{"serf", "query_duplicate_acks"}, 1, s.metricLabels) return } - metrics.IncrCounter([]string{"serf", "query_acks"}, 1) + metrics.IncrCounterWithLabels([]string{"serf", "query_acks"}, 1, s.metricLabels) err := query.sendAck(resp) if err != nil { s.logger.Printf("[WARN] %v", err) @@ -1431,11 +1440,11 @@ func (s *Serf) handleQueryResponse(resp *messageQueryResponse) { } else { // Exit early if this is a duplicate response if _, ok := query.responses[resp.From]; ok { - metrics.IncrCounter([]string{"serf", "query_duplicate_responses"}, 1) + metrics.IncrCounterWithLabels([]string{"serf", "query_duplicate_responses"}, 1, s.metricLabels) return } - metrics.IncrCounter([]string{"serf", "query_responses"}, 1) + metrics.IncrCounterWithLabels([]string{"serf", "query_responses"}, 1, s.metricLabels) err := query.sendResponse(NodeResponse{From: resp.From, Payload: resp.Payload}) if err != nil { s.logger.Printf("[WARN] %v", err) @@ -1676,7 +1685,7 @@ func (s *Serf) checkQueueDepth(name string, queue *memberlist.TransmitLimitedQue select { case <-time.After(s.config.QueueCheckInterval): numq := queue.NumQueued() - metrics.AddSample([]string{"serf", "queue", name}, float32(numq)) + metrics.AddSampleWithLabels([]string{"serf", "queue", name}, float32(numq), s.metricLabels) if numq >= s.config.QueueDepthWarning { s.logger.Printf("[WARN] serf: %s queue depth: %d", name, numq) } diff --git a/serf/snapshot.go b/serf/snapshot.go index d2eda0ea2..b62f92434 100644 --- a/serf/snapshot.go +++ b/serf/snapshot.go @@ -78,6 +78,7 @@ type Snapshotter struct { shutdownCh <-chan struct{} waitCh chan struct{} lastAttemptedCompaction time.Time + metricLabels []metrics.Label } // PreviousNode is used to represent the previously known alive nodes @@ -390,7 +391,7 @@ func (s *Snapshotter) tryAppend(l string) { // appendLine is used to append a line to the existing log func (s *Snapshotter) appendLine(l string) error { - defer metrics.MeasureSince([]string{"serf", "snapshot", "appendLine"}, time.Now()) + defer metrics.MeasureSinceWithLabels([]string{"serf", "snapshot", "appendLine"}, time.Now(), s.metricLabels) n, err := s.buffered.WriteString(l) if err != nil { @@ -429,7 +430,7 @@ func (s *Snapshotter) snapshotMaxSize() int64 { // Compact is used to compact the snapshot once it is too large func (s *Snapshotter) compact() error { - defer metrics.MeasureSince([]string{"serf", "snapshot", "compact"}, time.Now()) + defer metrics.MeasureSinceWithLabels([]string{"serf", "snapshot", "compact"}, time.Now(), s.metricLabels) // Try to open the file to new fiel newPath := s.path + tmpExt