Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions balancer/pickfirst/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,11 +102,31 @@ func (s) TestPickFirstMetrics(t *testing.T) {
t.Errorf("Unexpected data for metric %v, got: %v, want: %v", "grpc.lb.pick_first.disconnections", got, 0)
}

//Checking for subchannel metrics as well
if got, _ := tmr.Metric("grpc.subchannel.connection_attempts_succeeded"); got != 1 {
t.Errorf("Unexpected data for metric %v, got: %v, want: %v", "grpc.subchannel.connection_attempts_succeeded", got, 1)
}
if got, _ := tmr.Metric("grpc.subchannel.connection_attempts_failed"); got != 0 {
t.Errorf("Unexpected data for metric %v, got: %v, want: %v", "grpc.subchannel.connection_attempts_failed", got, 0)
}
if got, _ := tmr.Metric("grpc.subchannel.disconnections"); got != 0 {
t.Errorf("Unexpected data for metric %v, got: %v, want: %v", "grpc.subchannel.disconnections", got, 0)
}
if got, _ := tmr.Metric("grpc.subchannel.open_connections"); got != 1 {
t.Errorf("Unexpected data for metric %v, got: %v, want: %v", "grpc.subchannel.open_connections", got, 1)
}

ss.Stop()
testutils.AwaitState(ctx, t, cc, connectivity.Idle)
if got, _ := tmr.Metric("grpc.lb.pick_first.disconnections"); got != 1 {
t.Errorf("Unexpected data for metric %v, got: %v, want: %v", "grpc.lb.pick_first.disconnections", got, 1)
}
if got, _ := tmr.Metric("grpc.subchannel.disconnections"); got != 1 {
t.Errorf("Unexpected data for metric %v, got: %v, want: %v", "grpc.subchannel.disconnections", got, 1)
}
if got, _ := tmr.Metric("grpc.subchannel.open_connections"); got != -1 {
t.Errorf("Unexpected data for metric %v, got: %v, want: %v", "grpc.subchannel.open_connections", got, -1)
}
}

// TestPickFirstMetricsFailure tests the connection attempts failed metric. It
Expand Down
54 changes: 54 additions & 0 deletions balancer/pickfirst/pickfirst_ext_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"encoding/json"
"errors"
"fmt"
"slices"
"strings"
"sync"
"testing"
Expand Down Expand Up @@ -1946,6 +1947,20 @@ func (s) TestPickFirstLeaf_HappyEyeballs_TF_AfterEndOfList(t *testing.T) {
if got, _ := tmr.Metric("grpc.lb.pick_first.disconnections"); got != 0 {
t.Errorf("Unexpected data for metric %v, got: %v, want: %v", "grpc.lb.pick_first.disconnections", got, 0)
}

if got, _ := tmr.Metric("grpc.subchannel.connection_attempts_succeeded"); got != 0 {
t.Errorf("Unexpected data for metric %v, got: %v, want: %v", "grpc.subchannel.connection_attempts_succeeded", got, 0)
}
if got, _ := tmr.Metric("grpc.subchannel.connection_attempts_failed"); got != 1 {
t.Errorf("Unexpected data for metric %v, got: %v, want: %v", "grpc.subchannel.connection_attempts_failed", got, 1)
}
expectedLabels := []string{"whatever:///test.server", "", "{region=\"\", zone=\"\", sub_zone=\"\"}"}
if detail, _ := tmr.MetricDetail("grpc.subchannel.connection_attempts_failed"); !slices.Equal(detail.LabelVals, expectedLabels) {
t.Errorf("Unexpected label values for metric %v, got: %v, want %v", "grpc.subchannel.connection_attempts_failed", detail.LabelVals, expectedLabels)
}
if got, _ := tmr.Metric("grpc.subchannel.disconnections"); got != 0 {
t.Errorf("Unexpected data for metric %v, got: %v, want: %v", "grpc.subchannel.disconnections", got, 0)
}
}

// Test verifies that pickfirst attempts to connect to the second backend once
Expand Down Expand Up @@ -2006,6 +2021,27 @@ func (s) TestPickFirstLeaf_HappyEyeballs_TriggerConnectionDelay(t *testing.T) {
if got, _ := tmr.Metric("grpc.lb.pick_first.disconnections"); got != 0 {
t.Errorf("Unexpected data for metric %v, got: %v, want: %v", "grpc.lb.pick_first.disconnections", got, 0)
}

if got, _ := tmr.Metric("grpc.subchannel.connection_attempts_succeeded"); got != 1 {
t.Errorf("Unexpected data for metric %v, got: %v, want: %v", "grpc.subchannel.connection_attempts_succeeded", got, 1)
}
expectedLabels := []string{"whatever:///test.server", "", "{region=\"\", zone=\"\", sub_zone=\"\"}"}
if got, _ := tmr.MetricDetail("grpc.subchannel.connection_attempts_succeeded"); !slices.Equal(got.LabelVals, expectedLabels) {
t.Errorf("Unexpected data for metric %v, got: %s, want: %s", "grpc.subchannel.connection_attempts_succeeded", got.LabelVals, expectedLabels)
}
if got, _ := tmr.Metric("grpc.subchannel.connection_attempts_failed"); got != 0 {
t.Errorf("Unexpected data for metric %v, got: %v, want: %v", "grpc.subchannel.connection_attempts_failed", got, 0)
}
if got, _ := tmr.Metric("grpc.subchannel.disconnections"); got != 0 {
t.Errorf("Unexpected data for metric %v, got: %v, want: %v", "grpc.subchannel.disconnections", got, 0)
}
if got, _ := tmr.Metric("grpc.subchannel.open_connections"); got != 1 {
t.Errorf("Unexpected data for metric %v, got: %v, want: %v", "grpc.subchannel.open_connections", got, 1)
}
expectedLabels = []string{"whatever:///test.server", "", "NoSecurity", "{region=\"\", zone=\"\", sub_zone=\"\"}"}
if got, _ := tmr.MetricDetail("grpc.subchannel.open_connections"); !slices.Equal(got.LabelVals, expectedLabels) {
t.Errorf("Unexpected data for metric %v, got: %s, want: %s", "grpc.subchannel.open_connections", got.LabelVals, expectedLabels)
}
}

// Test tests the pickfirst balancer by causing a SubConn to fail and then
Expand Down Expand Up @@ -2057,6 +2093,13 @@ func (s) TestPickFirstLeaf_HappyEyeballs_TF_ThenTimerFires(t *testing.T) {
if got, _ := tmr.Metric("grpc.lb.pick_first.connection_attempts_failed"); got != 1 {
t.Errorf("Unexpected data for metric %v, got: %v, want: %v", "grpc.lb.pick_first.connection_attempts_failed", got, 1)
}
if got, _ := tmr.Metric("grpc.subchannel.connection_attempts_failed"); got != 1 {
t.Errorf("Unexpected data for metric %v, got: %v, want: %v", "grpc.subchannel.connection_attempts_failed", got, 1)
}
expectedLabels := []string{"whatever:///test.server", "", "{region=\"\", zone=\"\", sub_zone=\"\"}"}
if got, _ := tmr.MetricDetail("grpc.subchannel.connection_attempts_failed"); !slices.Equal(got.LabelVals, expectedLabels) {
t.Errorf("Unexpected data for metric %v, got: %s, want: %s", "grpc.subchannel.connection_attempts_failed", got.LabelVals, expectedLabels)
}
if holds[2].IsStarted() != false {
t.Fatalf("Server %d with address %q contacted unexpectedly", 2, addrs[2])
}
Expand All @@ -2080,6 +2123,17 @@ func (s) TestPickFirstLeaf_HappyEyeballs_TF_ThenTimerFires(t *testing.T) {
if got, _ := tmr.Metric("grpc.lb.pick_first.disconnections"); got != 0 {
t.Errorf("Unexpected data for metric %v, got: %v, want: %v", "grpc.lb.pick_first.disconnections", got, 0)
}

if got, _ := tmr.Metric("grpc.subchannel.connection_attempts_succeeded"); got != 1 {
t.Errorf("Unexpected data for metric %v, got: %v, want: %v", "grpc.subchannel.connection_attempts_succeeded", got, 1)
}
expectedLabels = []string{"whatever:///test.server", "", "{region=\"\", zone=\"\", sub_zone=\"\"}"}
if got, _ := tmr.MetricDetail("grpc.subchannel.connection_attempts_succeeded"); !slices.Equal(got.LabelVals, expectedLabels) {
t.Errorf("Unexpected data for metric %v, got: %s, want: %s", "grpc.subchannel.connection_attempts_succeeded", got.LabelVals, expectedLabels)
}
if got, _ := tmr.Metric("grpc.subchannel.disconnections"); got != 0 {
t.Errorf("Unexpected data for metric %v, got: %v, want: %v", "grpc.subchannel.disconnections", got, 0)
}
}

func (s) TestPickFirstLeaf_InterleavingIPV4Preferred(t *testing.T) {
Expand Down
26 changes: 18 additions & 8 deletions balancer/weightedroundrobin/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ var (
Description: "EXPERIMENTAL. Number of scheduler updates in which there were not enough endpoints with valid weight, which caused the WRR policy to fall back to RR behavior.",
Unit: "{update}",
Labels: []string{"grpc.target"},
OptionalLabels: []string{"grpc.lb.locality"},
OptionalLabels: []string{"grpc.lb.locality", "grpc.lb.backend_service"},
Default: false,
})

Expand All @@ -71,7 +71,7 @@ var (
Description: "EXPERIMENTAL. Number of endpoints from each scheduler update that don't yet have usable weight information (i.e., either the load report has not yet been received, or it is within the blackout period).",
Unit: "{endpoint}",
Labels: []string{"grpc.target"},
OptionalLabels: []string{"grpc.lb.locality"},
OptionalLabels: []string{"grpc.lb.locality", "grpc.lb.backend_service"},
Default: false,
})

Expand All @@ -80,15 +80,15 @@ var (
Description: "EXPERIMENTAL. Number of endpoints from each scheduler update whose latest weight is older than the expiration period.",
Unit: "{endpoint}",
Labels: []string{"grpc.target"},
OptionalLabels: []string{"grpc.lb.locality"},
OptionalLabels: []string{"grpc.lb.locality", "grpc.lb.backend_service"},
Default: false,
})
endpointWeightsMetric = estats.RegisterFloat64Histo(estats.MetricDescriptor{
Name: "grpc.lb.wrr.endpoint_weights",
Description: "EXPERIMENTAL. Weight of each endpoint, recorded on every scheduler update. Endpoints without usable weights will be recorded as weight 0.",
Unit: "{endpoint}",
Labels: []string{"grpc.target"},
OptionalLabels: []string{"grpc.lb.locality"},
OptionalLabels: []string{"grpc.lb.locality", "grpc.lb.backend_service"},
Default: false,
})
)
Expand Down Expand Up @@ -173,6 +173,7 @@ func (b *wrrBalancer) updateEndpointsLocked(endpoints []resolver.Endpoint) {
metricsRecorder: b.metricsRecorder,
target: b.target,
locality: b.locality,
cluster: b.clusterName,
}
for _, addr := range endpoint.Addresses {
b.addressWeights.Set(addr, ew)
Expand Down Expand Up @@ -211,6 +212,7 @@ type wrrBalancer struct {
mu sync.Mutex
cfg *lbConfig // active config
locality string
clusterName string
stopPicker *grpcsync.Event
addressWeights *resolver.AddressMapV2[*endpointWeight]
endpointToWeight *resolver.EndpointMap[*endpointWeight]
Expand All @@ -231,6 +233,11 @@ func (b *wrrBalancer) UpdateClientConnState(ccs balancer.ClientConnState) error
b.mu.Lock()
b.cfg = cfg
b.locality = weightedtarget.LocalityFromResolverState(ccs.ResolverState)
if cluster, ok := resolver.GetBackendServiceFromState(ccs.ResolverState); !ok {
b.logger.Infof("Backend service name not found in resolver state attributes.")
} else {
b.clusterName = cluster
}
b.updateEndpointsLocked(ccs.ResolverState.Endpoints)
b.mu.Unlock()

Expand Down Expand Up @@ -288,6 +295,7 @@ func (b *wrrBalancer) UpdateState(state balancer.State) {
metricsRecorder: b.metricsRecorder,
locality: b.locality,
target: b.target,
clusterName: b.clusterName,
}

b.stopPicker = grpcsync.NewEvent()
Expand Down Expand Up @@ -420,6 +428,7 @@ type picker struct {
// The following fields are immutable.
target string
locality string
clusterName string
metricsRecorder estats.MetricsRecorder
}

Expand Down Expand Up @@ -499,6 +508,7 @@ type endpointWeight struct {
target string
metricsRecorder estats.MetricsRecorder
locality string
cluster string

// The following fields are only accessed on calls into the LB policy, and
// do not need a mutex.
Expand Down Expand Up @@ -602,14 +612,14 @@ func (w *endpointWeight) weight(now time.Time, weightExpirationPeriod, blackoutP

if recordMetrics {
defer func() {
endpointWeightsMetric.Record(w.metricsRecorder, weight, w.target, w.locality)
endpointWeightsMetric.Record(w.metricsRecorder, weight, w.target, w.locality, w.cluster)
}()
}

// The endpoint has not received a load report (i.e. just turned READY with
// no load report).
if w.lastUpdated.Equal(time.Time{}) {
endpointWeightNotYetUsableMetric.Record(w.metricsRecorder, 1, w.target, w.locality)
endpointWeightNotYetUsableMetric.Record(w.metricsRecorder, 1, w.target, w.locality, w.cluster)
return 0
}

Expand All @@ -618,7 +628,7 @@ func (w *endpointWeight) weight(now time.Time, weightExpirationPeriod, blackoutP
// start getting data again in the future, and return 0.
if now.Sub(w.lastUpdated) >= weightExpirationPeriod {
if recordMetrics {
endpointWeightStaleMetric.Record(w.metricsRecorder, 1, w.target, w.locality)
endpointWeightStaleMetric.Record(w.metricsRecorder, 1, w.target, w.locality, w.cluster)
}
w.nonEmptySince = time.Time{}
return 0
Expand All @@ -627,7 +637,7 @@ func (w *endpointWeight) weight(now time.Time, weightExpirationPeriod, blackoutP
// If we don't have at least blackoutPeriod worth of data, return 0.
if blackoutPeriod != 0 && (w.nonEmptySince.Equal(time.Time{}) || now.Sub(w.nonEmptySince) < blackoutPeriod) {
if recordMetrics {
endpointWeightNotYetUsableMetric.Record(w.metricsRecorder, 1, w.target, w.locality)
endpointWeightNotYetUsableMetric.Record(w.metricsRecorder, 1, w.target, w.locality, w.cluster)
}
return 0
}
Expand Down
4 changes: 2 additions & 2 deletions balancer/weightedroundrobin/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func (p *picker) newScheduler(recordMetrics bool) scheduler {
}
if n == 1 {
if recordMetrics {
rrFallbackMetric.Record(p.metricsRecorder, 1, p.target, p.locality)
rrFallbackMetric.Record(p.metricsRecorder, 1, p.target, p.locality, p.clusterName)
}
return &rrScheduler{numSCs: 1, inc: p.inc}
}
Expand All @@ -58,7 +58,7 @@ func (p *picker) newScheduler(recordMetrics bool) scheduler {

if numZero >= n-1 {
if recordMetrics {
rrFallbackMetric.Record(p.metricsRecorder, 1, p.target, p.locality)
rrFallbackMetric.Record(p.metricsRecorder, 1, p.target, p.locality, p.clusterName)
}
return &rrScheduler{numSCs: uint32(n), inc: p.inc}
}
Expand Down
78 changes: 78 additions & 0 deletions clientconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,16 @@ import (
"google.golang.org/grpc/balancer/pickfirst"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials"
expstats "google.golang.org/grpc/experimental/stats"
"google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/channelz"
"google.golang.org/grpc/internal/grpcsync"
"google.golang.org/grpc/internal/idle"
iresolver "google.golang.org/grpc/internal/resolver"
istats "google.golang.org/grpc/internal/stats"
"google.golang.org/grpc/internal/transport"
xdsinternal "google.golang.org/grpc/internal/xds"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/serviceconfig"
Expand Down Expand Up @@ -98,6 +101,41 @@ var (
errTransportCredentialsMissing = errors.New("grpc: the credentials require transport level security (use grpc.WithTransportCredentials() to set)")
)

var (
disconnectionsMetric = expstats.RegisterInt64Count(expstats.MetricDescriptor{
Name: "grpc.subchannel.disconnections",
Description: "EXPERIMENTAL. Number of times the selected subchannel becomes disconnected.",
Unit: "{disconnection}",
Labels: []string{"grpc.target"},
OptionalLabels: []string{"grpc.lb.backend_service", "grpc.lb.locality", "grpc.disconnect_error"},
Default: false,
})
connectionAttemptsSucceededMetric = expstats.RegisterInt64Count(expstats.MetricDescriptor{
Name: "grpc.subchannel.connection_attempts_succeeded",
Description: "EXPERIMENTAL. Number of successful connection attempts.",
Unit: "{attempt}",
Labels: []string{"grpc.target"},
OptionalLabels: []string{"grpc.lb.backend_service", "grpc.lb.locality"},
Default: false,
})
connectionAttemptsFailedMetric = expstats.RegisterInt64Count(expstats.MetricDescriptor{
Name: "grpc.subchannel.connection_attempts_failed",
Description: "EXPERIMENTAL. Number of failed connection attempts.",
Unit: "{attempt}",
Labels: []string{"grpc.target"},
OptionalLabels: []string{"grpc.lb.backend_service", "grpc.lb.locality"},
Default: false,
})
openConnectionsMetric = expstats.RegisterInt64UpDownCount(expstats.MetricDescriptor{
Name: "grpc.subchannel.open_connections",
Description: "EXPERIMENTAL. Number of open connections.",
Unit: "{attempt}",
Labels: []string{"grpc.target"},
OptionalLabels: []string{"grpc.lb.backend_service", "grpc.security_level", "grpc.lb.locality"},
Default: false,
})
)

const (
defaultClientMaxReceiveMessageSize = 1024 * 1024 * 4
defaultClientMaxSendMessageSize = math.MaxInt32
Expand Down Expand Up @@ -1223,6 +1261,18 @@ func (ac *addrConn) updateConnectivityState(s connectivity.State, lastErr error)
if ac.state == s {
return
}

var locality, backendService string
if len(ac.addrs) > 0 {
labels := xdsinternal.AddressToTelemetryLabels(ac.addrs[0])
locality = labels["grpc.lb.locality"]
backendService = labels["grpc.lb.backend_service"]
}

if ac.state == connectivity.Ready || (ac.state == connectivity.Connecting && s == connectivity.Idle) {
disconnectionsMetric.Record(ac.cc.metricsRecorderList, 1, ac.cc.target, backendService, locality, "unknown")
openConnectionsMetric.Record(ac.cc.metricsRecorderList, -1, ac.cc.target, backendService, ac.securityLevel(), locality)
}
ac.state = s
ac.channelz.ChannelMetrics.State.Store(&s)
if lastErr == nil {
Expand Down Expand Up @@ -1276,10 +1326,18 @@ func (ac *addrConn) resetTransportAndUnlock() {
// https://github.com/grpc/grpc/blob/master/doc/connection-backoff.md#proposed-backoff-algorithm
connectDeadline := time.Now().Add(dialDuration)

var locality, backendService string
if len(ac.addrs) > 0 {
labels := xdsinternal.AddressToTelemetryLabels(ac.addrs[0])
locality = labels["grpc.lb.locality"]
backendService = labels["grpc.lb.backend_service"]
}

ac.updateConnectivityState(connectivity.Connecting, nil)
ac.mu.Unlock()

if err := ac.tryAllAddrs(acCtx, addrs, connectDeadline); err != nil {
connectionAttemptsFailedMetric.Record(ac.cc.metricsRecorderList, 1, ac.cc.target, backendService, locality)
// TODO: #7534 - Move re-resolution requests into the pick_first LB policy
// to ensure one resolution request per pass instead of per subconn failure.
ac.cc.resolveNow(resolver.ResolveNowOptions{})
Expand Down Expand Up @@ -1319,10 +1377,30 @@ func (ac *addrConn) resetTransportAndUnlock() {
}
// Success; reset backoff.
ac.mu.Lock()
connectionAttemptsSucceededMetric.Record(ac.cc.metricsRecorderList, 1, ac.cc.target, backendService, locality)
openConnectionsMetric.Record(ac.cc.metricsRecorderList, 1, ac.cc.target, backendService, ac.securityLevel(), locality)
ac.backoffIdx = 0
ac.mu.Unlock()
}

type securityLevelKey struct{}

func (ac *addrConn) securityLevel() string {
var secLevel string
if ac.transport == nil {
secLevel, _ = ac.curAddr.Attributes.Value(securityLevelKey{}).(string)
return secLevel
}
authInfo := ac.transport.AuthInfo()
if ci, ok := authInfo.(interface {
GetCommonAuthInfo() credentials.CommonAuthInfo
}); ok {
secLevel = ci.GetCommonAuthInfo().SecurityLevel.String()
ac.curAddr.Attributes = ac.curAddr.Attributes.WithValue(securityLevelKey{}, secLevel)
}
return secLevel
}

// tryAllAddrs tries to create a connection to the addresses, and stop when at
// the first successful one. It returns an error if no address was successfully
// connected, or updates ac appropriately with the new transport.
Expand Down
Loading
Loading