From fba7b0793a0612fcbff8b51fec18fef33cf6ec7b Mon Sep 17 00:00:00 2001 From: Madhav Bissa Date: Thu, 13 Nov 2025 08:45:47 +0000 Subject: [PATCH] a89 wrr changes adding backend service to per call metrics a94 subchannel metrics with labels --- balancer/pickfirst/metrics_test.go | 20 ++ balancer/pickfirst/pickfirst_ext_test.go | 54 +++++ balancer/weightedroundrobin/balancer.go | 26 ++- balancer/weightedroundrobin/scheduler.go | 4 +- clientconn.go | 78 +++++++ .../testutils/stats/test_metrics_recorder.go | 50 ++-- internal/transport/http2_client.go | 2 + internal/transport/transport.go | 2 + .../xds/balancer/clusterimpl/clusterimpl.go | 4 +- internal/xds/xds.go | 11 + resolver/resolver.go | 16 ++ stats/opentelemetry/csm/observability_test.go | 7 +- stats/opentelemetry/e2e_test.go | 217 +++++++++++++++++- 13 files changed, 460 insertions(+), 31 deletions(-) diff --git a/balancer/pickfirst/metrics_test.go b/balancer/pickfirst/metrics_test.go index 0deb12715831..8544b5cd0bc3 100644 --- a/balancer/pickfirst/metrics_test.go +++ b/balancer/pickfirst/metrics_test.go @@ -102,11 +102,31 @@ func (s) TestPickFirstMetrics(t *testing.T) { t.Errorf("Unexpected data for metric %v, got: %v, want: %v", "grpc.lb.pick_first.disconnections", got, 0) } + //Checking for subchannel metrics as well + if got, _ := tmr.Metric("grpc.subchannel.connection_attempts_succeeded"); got != 1 { + t.Errorf("Unexpected data for metric %v, got: %v, want: %v", "grpc.subchannel.connection_attempts_succeeded", got, 1) + } + if got, _ := tmr.Metric("grpc.subchannel.connection_attempts_failed"); got != 0 { + t.Errorf("Unexpected data for metric %v, got: %v, want: %v", "grpc.subchannel.connection_attempts_failed", got, 0) + } + if got, _ := tmr.Metric("grpc.subchannel.disconnections"); got != 0 { + t.Errorf("Unexpected data for metric %v, got: %v, want: %v", "grpc.subchannel.disconnections", got, 0) + } + if got, _ := tmr.Metric("grpc.subchannel.open_connections"); got != 1 { + t.Errorf("Unexpected data for metric %v, got: %v, want: %v", "grpc.subchannel.open_connections", got, 1) + } + ss.Stop() testutils.AwaitState(ctx, t, cc, connectivity.Idle) if got, _ := tmr.Metric("grpc.lb.pick_first.disconnections"); got != 1 { t.Errorf("Unexpected data for metric %v, got: %v, want: %v", "grpc.lb.pick_first.disconnections", got, 1) } + if got, _ := tmr.Metric("grpc.subchannel.disconnections"); got != 1 { + t.Errorf("Unexpected data for metric %v, got: %v, want: %v", "grpc.subchannel.disconnections", got, 1) + } + if got, _ := tmr.Metric("grpc.subchannel.open_connections"); got != -1 { + t.Errorf("Unexpected data for metric %v, got: %v, want: %v", "grpc.subchannel.open_connections", got, -1) + } } // TestPickFirstMetricsFailure tests the connection attempts failed metric. It diff --git a/balancer/pickfirst/pickfirst_ext_test.go b/balancer/pickfirst/pickfirst_ext_test.go index 3591a6e36baa..683b136a0bff 100644 --- a/balancer/pickfirst/pickfirst_ext_test.go +++ b/balancer/pickfirst/pickfirst_ext_test.go @@ -23,6 +23,7 @@ import ( "encoding/json" "errors" "fmt" + "slices" "strings" "sync" "testing" @@ -1946,6 +1947,20 @@ func (s) TestPickFirstLeaf_HappyEyeballs_TF_AfterEndOfList(t *testing.T) { if got, _ := tmr.Metric("grpc.lb.pick_first.disconnections"); got != 0 { t.Errorf("Unexpected data for metric %v, got: %v, want: %v", "grpc.lb.pick_first.disconnections", got, 0) } + + if got, _ := tmr.Metric("grpc.subchannel.connection_attempts_succeeded"); got != 0 { + t.Errorf("Unexpected data for metric %v, got: %v, want: %v", "grpc.subchannel.connection_attempts_succeeded", got, 0) + } + if got, _ := tmr.Metric("grpc.subchannel.connection_attempts_failed"); got != 1 { + t.Errorf("Unexpected data for metric %v, got: %v, want: %v", "grpc.subchannel.connection_attempts_failed", got, 1) + } + expectedLabels := []string{"whatever:///test.server", "", "{region=\"\", zone=\"\", sub_zone=\"\"}"} + if detail, _ := tmr.MetricDetail("grpc.subchannel.connection_attempts_failed"); !slices.Equal(detail.LabelVals, expectedLabels) { + t.Errorf("Unexpected label values for metric %v, got: %v, want %v", "grpc.subchannel.connection_attempts_failed", detail.LabelVals, expectedLabels) + } + if got, _ := tmr.Metric("grpc.subchannel.disconnections"); got != 0 { + t.Errorf("Unexpected data for metric %v, got: %v, want: %v", "grpc.subchannel.disconnections", got, 0) + } } // Test verifies that pickfirst attempts to connect to the second backend once @@ -2006,6 +2021,27 @@ func (s) TestPickFirstLeaf_HappyEyeballs_TriggerConnectionDelay(t *testing.T) { if got, _ := tmr.Metric("grpc.lb.pick_first.disconnections"); got != 0 { t.Errorf("Unexpected data for metric %v, got: %v, want: %v", "grpc.lb.pick_first.disconnections", got, 0) } + + if got, _ := tmr.Metric("grpc.subchannel.connection_attempts_succeeded"); got != 1 { + t.Errorf("Unexpected data for metric %v, got: %v, want: %v", "grpc.subchannel.connection_attempts_succeeded", got, 1) + } + expectedLabels := []string{"whatever:///test.server", "", "{region=\"\", zone=\"\", sub_zone=\"\"}"} + if got, _ := tmr.MetricDetail("grpc.subchannel.connection_attempts_succeeded"); !slices.Equal(got.LabelVals, expectedLabels) { + t.Errorf("Unexpected data for metric %v, got: %s, want: %s", "grpc.subchannel.connection_attempts_succeeded", got.LabelVals, expectedLabels) + } + if got, _ := tmr.Metric("grpc.subchannel.connection_attempts_failed"); got != 0 { + t.Errorf("Unexpected data for metric %v, got: %v, want: %v", "grpc.subchannel.connection_attempts_failed", got, 0) + } + if got, _ := tmr.Metric("grpc.subchannel.disconnections"); got != 0 { + t.Errorf("Unexpected data for metric %v, got: %v, want: %v", "grpc.subchannel.disconnections", got, 0) + } + if got, _ := tmr.Metric("grpc.subchannel.open_connections"); got != 1 { + t.Errorf("Unexpected data for metric %v, got: %v, want: %v", "grpc.subchannel.open_connections", got, 1) + } + expectedLabels = []string{"whatever:///test.server", "", "NoSecurity", "{region=\"\", zone=\"\", sub_zone=\"\"}"} + if got, _ := tmr.MetricDetail("grpc.subchannel.open_connections"); !slices.Equal(got.LabelVals, expectedLabels) { + t.Errorf("Unexpected data for metric %v, got: %s, want: %s", "grpc.subchannel.open_connections", got.LabelVals, expectedLabels) + } } // Test tests the pickfirst balancer by causing a SubConn to fail and then @@ -2057,6 +2093,13 @@ func (s) TestPickFirstLeaf_HappyEyeballs_TF_ThenTimerFires(t *testing.T) { if got, _ := tmr.Metric("grpc.lb.pick_first.connection_attempts_failed"); got != 1 { t.Errorf("Unexpected data for metric %v, got: %v, want: %v", "grpc.lb.pick_first.connection_attempts_failed", got, 1) } + if got, _ := tmr.Metric("grpc.subchannel.connection_attempts_failed"); got != 1 { + t.Errorf("Unexpected data for metric %v, got: %v, want: %v", "grpc.subchannel.connection_attempts_failed", got, 1) + } + expectedLabels := []string{"whatever:///test.server", "", "{region=\"\", zone=\"\", sub_zone=\"\"}"} + if got, _ := tmr.MetricDetail("grpc.subchannel.connection_attempts_failed"); !slices.Equal(got.LabelVals, expectedLabels) { + t.Errorf("Unexpected data for metric %v, got: %s, want: %s", "grpc.subchannel.connection_attempts_failed", got.LabelVals, expectedLabels) + } if holds[2].IsStarted() != false { t.Fatalf("Server %d with address %q contacted unexpectedly", 2, addrs[2]) } @@ -2080,6 +2123,17 @@ func (s) TestPickFirstLeaf_HappyEyeballs_TF_ThenTimerFires(t *testing.T) { if got, _ := tmr.Metric("grpc.lb.pick_first.disconnections"); got != 0 { t.Errorf("Unexpected data for metric %v, got: %v, want: %v", "grpc.lb.pick_first.disconnections", got, 0) } + + if got, _ := tmr.Metric("grpc.subchannel.connection_attempts_succeeded"); got != 1 { + t.Errorf("Unexpected data for metric %v, got: %v, want: %v", "grpc.subchannel.connection_attempts_succeeded", got, 1) + } + expectedLabels = []string{"whatever:///test.server", "", "{region=\"\", zone=\"\", sub_zone=\"\"}"} + if got, _ := tmr.MetricDetail("grpc.subchannel.connection_attempts_succeeded"); !slices.Equal(got.LabelVals, expectedLabels) { + t.Errorf("Unexpected data for metric %v, got: %s, want: %s", "grpc.subchannel.connection_attempts_succeeded", got.LabelVals, expectedLabels) + } + if got, _ := tmr.Metric("grpc.subchannel.disconnections"); got != 0 { + t.Errorf("Unexpected data for metric %v, got: %v, want: %v", "grpc.subchannel.disconnections", got, 0) + } } func (s) TestPickFirstLeaf_InterleavingIPV4Preferred(t *testing.T) { diff --git a/balancer/weightedroundrobin/balancer.go b/balancer/weightedroundrobin/balancer.go index 0de02e5e5088..b76728006397 100644 --- a/balancer/weightedroundrobin/balancer.go +++ b/balancer/weightedroundrobin/balancer.go @@ -62,7 +62,7 @@ var ( Description: "EXPERIMENTAL. Number of scheduler updates in which there were not enough endpoints with valid weight, which caused the WRR policy to fall back to RR behavior.", Unit: "{update}", Labels: []string{"grpc.target"}, - OptionalLabels: []string{"grpc.lb.locality"}, + OptionalLabels: []string{"grpc.lb.locality", "grpc.lb.backend_service"}, Default: false, }) @@ -71,7 +71,7 @@ var ( Description: "EXPERIMENTAL. Number of endpoints from each scheduler update that don't yet have usable weight information (i.e., either the load report has not yet been received, or it is within the blackout period).", Unit: "{endpoint}", Labels: []string{"grpc.target"}, - OptionalLabels: []string{"grpc.lb.locality"}, + OptionalLabels: []string{"grpc.lb.locality", "grpc.lb.backend_service"}, Default: false, }) @@ -80,7 +80,7 @@ var ( Description: "EXPERIMENTAL. Number of endpoints from each scheduler update whose latest weight is older than the expiration period.", Unit: "{endpoint}", Labels: []string{"grpc.target"}, - OptionalLabels: []string{"grpc.lb.locality"}, + OptionalLabels: []string{"grpc.lb.locality", "grpc.lb.backend_service"}, Default: false, }) endpointWeightsMetric = estats.RegisterFloat64Histo(estats.MetricDescriptor{ @@ -88,7 +88,7 @@ var ( Description: "EXPERIMENTAL. Weight of each endpoint, recorded on every scheduler update. Endpoints without usable weights will be recorded as weight 0.", Unit: "{endpoint}", Labels: []string{"grpc.target"}, - OptionalLabels: []string{"grpc.lb.locality"}, + OptionalLabels: []string{"grpc.lb.locality", "grpc.lb.backend_service"}, Default: false, }) ) @@ -173,6 +173,7 @@ func (b *wrrBalancer) updateEndpointsLocked(endpoints []resolver.Endpoint) { metricsRecorder: b.metricsRecorder, target: b.target, locality: b.locality, + cluster: b.clusterName, } for _, addr := range endpoint.Addresses { b.addressWeights.Set(addr, ew) @@ -211,6 +212,7 @@ type wrrBalancer struct { mu sync.Mutex cfg *lbConfig // active config locality string + clusterName string stopPicker *grpcsync.Event addressWeights *resolver.AddressMapV2[*endpointWeight] endpointToWeight *resolver.EndpointMap[*endpointWeight] @@ -231,6 +233,11 @@ func (b *wrrBalancer) UpdateClientConnState(ccs balancer.ClientConnState) error b.mu.Lock() b.cfg = cfg b.locality = weightedtarget.LocalityFromResolverState(ccs.ResolverState) + if cluster, ok := resolver.GetBackendServiceFromState(ccs.ResolverState); !ok { + b.logger.Infof("Backend service name not found in resolver state attributes.") + } else { + b.clusterName = cluster + } b.updateEndpointsLocked(ccs.ResolverState.Endpoints) b.mu.Unlock() @@ -288,6 +295,7 @@ func (b *wrrBalancer) UpdateState(state balancer.State) { metricsRecorder: b.metricsRecorder, locality: b.locality, target: b.target, + clusterName: b.clusterName, } b.stopPicker = grpcsync.NewEvent() @@ -420,6 +428,7 @@ type picker struct { // The following fields are immutable. target string locality string + clusterName string metricsRecorder estats.MetricsRecorder } @@ -499,6 +508,7 @@ type endpointWeight struct { target string metricsRecorder estats.MetricsRecorder locality string + cluster string // The following fields are only accessed on calls into the LB policy, and // do not need a mutex. @@ -602,14 +612,14 @@ func (w *endpointWeight) weight(now time.Time, weightExpirationPeriod, blackoutP if recordMetrics { defer func() { - endpointWeightsMetric.Record(w.metricsRecorder, weight, w.target, w.locality) + endpointWeightsMetric.Record(w.metricsRecorder, weight, w.target, w.locality, w.cluster) }() } // The endpoint has not received a load report (i.e. just turned READY with // no load report). if w.lastUpdated.Equal(time.Time{}) { - endpointWeightNotYetUsableMetric.Record(w.metricsRecorder, 1, w.target, w.locality) + endpointWeightNotYetUsableMetric.Record(w.metricsRecorder, 1, w.target, w.locality, w.cluster) return 0 } @@ -618,7 +628,7 @@ func (w *endpointWeight) weight(now time.Time, weightExpirationPeriod, blackoutP // start getting data again in the future, and return 0. if now.Sub(w.lastUpdated) >= weightExpirationPeriod { if recordMetrics { - endpointWeightStaleMetric.Record(w.metricsRecorder, 1, w.target, w.locality) + endpointWeightStaleMetric.Record(w.metricsRecorder, 1, w.target, w.locality, w.cluster) } w.nonEmptySince = time.Time{} return 0 @@ -627,7 +637,7 @@ func (w *endpointWeight) weight(now time.Time, weightExpirationPeriod, blackoutP // If we don't have at least blackoutPeriod worth of data, return 0. if blackoutPeriod != 0 && (w.nonEmptySince.Equal(time.Time{}) || now.Sub(w.nonEmptySince) < blackoutPeriod) { if recordMetrics { - endpointWeightNotYetUsableMetric.Record(w.metricsRecorder, 1, w.target, w.locality) + endpointWeightNotYetUsableMetric.Record(w.metricsRecorder, 1, w.target, w.locality, w.cluster) } return 0 } diff --git a/balancer/weightedroundrobin/scheduler.go b/balancer/weightedroundrobin/scheduler.go index 7d3d6815eb7a..8ed8d92a781c 100644 --- a/balancer/weightedroundrobin/scheduler.go +++ b/balancer/weightedroundrobin/scheduler.go @@ -39,7 +39,7 @@ func (p *picker) newScheduler(recordMetrics bool) scheduler { } if n == 1 { if recordMetrics { - rrFallbackMetric.Record(p.metricsRecorder, 1, p.target, p.locality) + rrFallbackMetric.Record(p.metricsRecorder, 1, p.target, p.locality, p.clusterName) } return &rrScheduler{numSCs: 1, inc: p.inc} } @@ -58,7 +58,7 @@ func (p *picker) newScheduler(recordMetrics bool) scheduler { if numZero >= n-1 { if recordMetrics { - rrFallbackMetric.Record(p.metricsRecorder, 1, p.target, p.locality) + rrFallbackMetric.Record(p.metricsRecorder, 1, p.target, p.locality, p.clusterName) } return &rrScheduler{numSCs: uint32(n), inc: p.inc} } diff --git a/clientconn.go b/clientconn.go index c0c2c9a76abf..ab25731f6ff1 100644 --- a/clientconn.go +++ b/clientconn.go @@ -35,6 +35,8 @@ import ( "google.golang.org/grpc/balancer/pickfirst" "google.golang.org/grpc/codes" "google.golang.org/grpc/connectivity" + "google.golang.org/grpc/credentials" + expstats "google.golang.org/grpc/experimental/stats" "google.golang.org/grpc/internal" "google.golang.org/grpc/internal/channelz" "google.golang.org/grpc/internal/grpcsync" @@ -42,6 +44,7 @@ import ( iresolver "google.golang.org/grpc/internal/resolver" istats "google.golang.org/grpc/internal/stats" "google.golang.org/grpc/internal/transport" + xdsinternal "google.golang.org/grpc/internal/xds" "google.golang.org/grpc/keepalive" "google.golang.org/grpc/resolver" "google.golang.org/grpc/serviceconfig" @@ -98,6 +101,41 @@ var ( errTransportCredentialsMissing = errors.New("grpc: the credentials require transport level security (use grpc.WithTransportCredentials() to set)") ) +var ( + disconnectionsMetric = expstats.RegisterInt64Count(expstats.MetricDescriptor{ + Name: "grpc.subchannel.disconnections", + Description: "EXPERIMENTAL. Number of times the selected subchannel becomes disconnected.", + Unit: "{disconnection}", + Labels: []string{"grpc.target"}, + OptionalLabels: []string{"grpc.lb.backend_service", "grpc.lb.locality", "grpc.disconnect_error"}, + Default: false, + }) + connectionAttemptsSucceededMetric = expstats.RegisterInt64Count(expstats.MetricDescriptor{ + Name: "grpc.subchannel.connection_attempts_succeeded", + Description: "EXPERIMENTAL. Number of successful connection attempts.", + Unit: "{attempt}", + Labels: []string{"grpc.target"}, + OptionalLabels: []string{"grpc.lb.backend_service", "grpc.lb.locality"}, + Default: false, + }) + connectionAttemptsFailedMetric = expstats.RegisterInt64Count(expstats.MetricDescriptor{ + Name: "grpc.subchannel.connection_attempts_failed", + Description: "EXPERIMENTAL. Number of failed connection attempts.", + Unit: "{attempt}", + Labels: []string{"grpc.target"}, + OptionalLabels: []string{"grpc.lb.backend_service", "grpc.lb.locality"}, + Default: false, + }) + openConnectionsMetric = expstats.RegisterInt64UpDownCount(expstats.MetricDescriptor{ + Name: "grpc.subchannel.open_connections", + Description: "EXPERIMENTAL. Number of open connections.", + Unit: "{attempt}", + Labels: []string{"grpc.target"}, + OptionalLabels: []string{"grpc.lb.backend_service", "grpc.security_level", "grpc.lb.locality"}, + Default: false, + }) +) + const ( defaultClientMaxReceiveMessageSize = 1024 * 1024 * 4 defaultClientMaxSendMessageSize = math.MaxInt32 @@ -1223,6 +1261,18 @@ func (ac *addrConn) updateConnectivityState(s connectivity.State, lastErr error) if ac.state == s { return } + + var locality, backendService string + if len(ac.addrs) > 0 { + labels := xdsinternal.AddressToTelemetryLabels(ac.addrs[0]) + locality = labels["grpc.lb.locality"] + backendService = labels["grpc.lb.backend_service"] + } + + if ac.state == connectivity.Ready || (ac.state == connectivity.Connecting && s == connectivity.Idle) { + disconnectionsMetric.Record(ac.cc.metricsRecorderList, 1, ac.cc.target, backendService, locality, "unknown") + openConnectionsMetric.Record(ac.cc.metricsRecorderList, -1, ac.cc.target, backendService, ac.securityLevel(), locality) + } ac.state = s ac.channelz.ChannelMetrics.State.Store(&s) if lastErr == nil { @@ -1276,10 +1326,18 @@ func (ac *addrConn) resetTransportAndUnlock() { // https://github.com/grpc/grpc/blob/master/doc/connection-backoff.md#proposed-backoff-algorithm connectDeadline := time.Now().Add(dialDuration) + var locality, backendService string + if len(ac.addrs) > 0 { + labels := xdsinternal.AddressToTelemetryLabels(ac.addrs[0]) + locality = labels["grpc.lb.locality"] + backendService = labels["grpc.lb.backend_service"] + } + ac.updateConnectivityState(connectivity.Connecting, nil) ac.mu.Unlock() if err := ac.tryAllAddrs(acCtx, addrs, connectDeadline); err != nil { + connectionAttemptsFailedMetric.Record(ac.cc.metricsRecorderList, 1, ac.cc.target, backendService, locality) // TODO: #7534 - Move re-resolution requests into the pick_first LB policy // to ensure one resolution request per pass instead of per subconn failure. ac.cc.resolveNow(resolver.ResolveNowOptions{}) @@ -1319,10 +1377,30 @@ func (ac *addrConn) resetTransportAndUnlock() { } // Success; reset backoff. ac.mu.Lock() + connectionAttemptsSucceededMetric.Record(ac.cc.metricsRecorderList, 1, ac.cc.target, backendService, locality) + openConnectionsMetric.Record(ac.cc.metricsRecorderList, 1, ac.cc.target, backendService, ac.securityLevel(), locality) ac.backoffIdx = 0 ac.mu.Unlock() } +type securityLevelKey struct{} + +func (ac *addrConn) securityLevel() string { + var secLevel string + if ac.transport == nil { + secLevel, _ = ac.curAddr.Attributes.Value(securityLevelKey{}).(string) + return secLevel + } + authInfo := ac.transport.AuthInfo() + if ci, ok := authInfo.(interface { + GetCommonAuthInfo() credentials.CommonAuthInfo + }); ok { + secLevel = ci.GetCommonAuthInfo().SecurityLevel.String() + ac.curAddr.Attributes = ac.curAddr.Attributes.WithValue(securityLevelKey{}, secLevel) + } + return secLevel +} + // tryAllAddrs tries to create a connection to the addresses, and stop when at // the first successful one. It returns an error if no address was successfully // connected, or updates ac appropriately with the new transport. diff --git a/internal/testutils/stats/test_metrics_recorder.go b/internal/testutils/stats/test_metrics_recorder.go index be1a06117a2f..890f8ebdaa9f 100644 --- a/internal/testutils/stats/test_metrics_recorder.go +++ b/internal/testutils/stats/test_metrics_recorder.go @@ -46,6 +46,8 @@ type TestMetricsRecorder struct { mu sync.Mutex // data is the most recent update for each metric name. data map[string]float64 + // detail contains all metric data including labels + detail map[string]MetricsData } // NewTestMetricsRecorder returns a new TestMetricsRecorder. @@ -58,7 +60,8 @@ func NewTestMetricsRecorder() *TestMetricsRecorder { intGaugeCh: testutils.NewChannelWithSize(10), intUpDownCountCh: testutils.NewChannelWithSize(10), - data: make(map[string]float64), + data: make(map[string]float64), + detail: make(map[string]MetricsData), } } @@ -71,6 +74,15 @@ func (r *TestMetricsRecorder) Metric(name string) (float64, bool) { return data, ok } +// MetricDetail returns entire MetricsData detail for metric which includes +// label keys and values. +func (r *TestMetricsRecorder) MetricDetail(name string) (MetricsData, bool) { + r.mu.Lock() + defer r.mu.Unlock() + detail, ok := r.detail[name] + return detail, ok +} + // ClearMetrics clears the metrics data store of the test metrics recorder. func (r *TestMetricsRecorder) ClearMetrics() { r.mu.Lock() @@ -125,32 +137,36 @@ func (r *TestMetricsRecorder) WaitForInt64CountIncr(ctx context.Context, incrWan // the internal data map with the recorded value. func (r *TestMetricsRecorder) RecordInt64Count(handle *estats.Int64CountHandle, incr int64, labels ...string) { r.intCountCh.ReceiveOrFail() - r.intCountCh.Send(MetricsData{ + detail := MetricsData{ Handle: handle.Descriptor(), IntIncr: incr, LabelKeys: append(handle.Labels, handle.OptionalLabels...), LabelVals: labels, - }) + } + r.intCountCh.Send(detail) r.mu.Lock() defer r.mu.Unlock() r.data[handle.Name] = float64(incr) + r.detail[handle.Name] = detail } // RecordInt64UpDownCount sends the metrics data to the intUpDownCountCh channel and updates // the internal data map with the recorded value. func (r *TestMetricsRecorder) RecordInt64UpDownCount(handle *estats.Int64UpDownCountHandle, incr int64, labels ...string) { r.intUpDownCountCh.ReceiveOrFail() - r.intUpDownCountCh.Send(MetricsData{ + detail := MetricsData{ Handle: handle.Descriptor(), IntIncr: incr, LabelKeys: append(handle.Labels, handle.OptionalLabels...), LabelVals: labels, - }) + } + r.intUpDownCountCh.Send(detail) r.mu.Lock() defer r.mu.Unlock() r.data[handle.Name] = float64(incr) + r.detail[handle.Name] = detail } // WaitForFloat64Count waits for a float count metric to be recorded and @@ -172,16 +188,18 @@ func (r *TestMetricsRecorder) WaitForFloat64Count(ctx context.Context, metricsDa // updates the internal data map with the recorded value. func (r *TestMetricsRecorder) RecordFloat64Count(handle *estats.Float64CountHandle, incr float64, labels ...string) { r.floatCountCh.ReceiveOrFail() - r.floatCountCh.Send(MetricsData{ + detail := MetricsData{ Handle: handle.Descriptor(), FloatIncr: incr, LabelKeys: append(handle.Labels, handle.OptionalLabels...), LabelVals: labels, - }) + } + r.floatCountCh.Send(detail) r.mu.Lock() defer r.mu.Unlock() r.data[handle.Name] = incr + r.detail[handle.Name] = detail } // WaitForInt64Histo waits for an int histo metric to be recorded and verifies @@ -203,16 +221,18 @@ func (r *TestMetricsRecorder) WaitForInt64Histo(ctx context.Context, metricsData // the internal data map with the recorded value. func (r *TestMetricsRecorder) RecordInt64Histo(handle *estats.Int64HistoHandle, incr int64, labels ...string) { r.intHistoCh.ReceiveOrFail() - r.intHistoCh.Send(MetricsData{ + detail := MetricsData{ Handle: handle.Descriptor(), IntIncr: incr, LabelKeys: append(handle.Labels, handle.OptionalLabels...), LabelVals: labels, - }) + } + r.intHistoCh.Send(detail) r.mu.Lock() defer r.mu.Unlock() r.data[handle.Name] = float64(incr) + r.detail[handle.Name] = detail } // WaitForFloat64Histo waits for a float histo metric to be recorded and @@ -234,16 +254,18 @@ func (r *TestMetricsRecorder) WaitForFloat64Histo(ctx context.Context, metricsDa // updates the internal data map with the recorded value. func (r *TestMetricsRecorder) RecordFloat64Histo(handle *estats.Float64HistoHandle, incr float64, labels ...string) { r.floatHistoCh.ReceiveOrFail() - r.floatHistoCh.Send(MetricsData{ + detail := MetricsData{ Handle: handle.Descriptor(), FloatIncr: incr, LabelKeys: append(handle.Labels, handle.OptionalLabels...), LabelVals: labels, - }) + } + r.floatHistoCh.Send(detail) r.mu.Lock() defer r.mu.Unlock() r.data[handle.Name] = incr + r.detail[handle.Name] = detail } // WaitForInt64Gauge waits for a int gauge metric to be recorded and verifies @@ -264,16 +286,18 @@ func (r *TestMetricsRecorder) WaitForInt64Gauge(ctx context.Context, metricsData // the internal data map with the recorded value. func (r *TestMetricsRecorder) RecordInt64Gauge(handle *estats.Int64GaugeHandle, incr int64, labels ...string) { r.intGaugeCh.ReceiveOrFail() - r.intGaugeCh.Send(MetricsData{ + detail := MetricsData{ Handle: handle.Descriptor(), IntIncr: incr, LabelKeys: append(handle.Labels, handle.OptionalLabels...), LabelVals: labels, - }) + } + r.intGaugeCh.Send(detail) r.mu.Lock() defer r.mu.Unlock() r.data[handle.Name] = float64(incr) + r.detail[handle.Name] = detail } // To implement a stats.Handler, which allows it to be set as a dial option: diff --git a/internal/transport/http2_client.go b/internal/transport/http2_client.go index 27824793486e..36c0778a39dc 100644 --- a/internal/transport/http2_client.go +++ b/internal/transport/http2_client.go @@ -1809,6 +1809,8 @@ func (t *http2Client) socketMetrics() *channelz.EphemeralSocketMetrics { func (t *http2Client) RemoteAddr() net.Addr { return t.remoteAddr } +func (t *http2Client) AuthInfo() credentials.AuthInfo { return t.authInfo } + func (t *http2Client) incrMsgSent() { if channelz.IsOn() { t.channelz.SocketMetrics.MessagesSent.Add(1) diff --git a/internal/transport/transport.go b/internal/transport/transport.go index 5ff83a7d7d74..6c018b08e4bc 100644 --- a/internal/transport/transport.go +++ b/internal/transport/transport.go @@ -610,6 +610,8 @@ type ClientTransport interface { // RemoteAddr returns the remote network address. RemoteAddr() net.Addr + + AuthInfo() credentials.AuthInfo } // ServerTransport is the common interface for all gRPC server-side transport diff --git a/internal/xds/balancer/clusterimpl/clusterimpl.go b/internal/xds/balancer/clusterimpl/clusterimpl.go index cdc7bf2eb579..19303fd13a69 100644 --- a/internal/xds/balancer/clusterimpl/clusterimpl.go +++ b/internal/xds/balancer/clusterimpl/clusterimpl.go @@ -295,9 +295,11 @@ func (b *clusterImplBalancer) UpdateClientConnState(s balancer.ClientConnState) return err } + newState := resolver.SetBackendServiceOnState(s.ResolverState, b.clusterName) + // Addresses and sub-balancer config are sent to sub-balancer. err = b.child.UpdateClientConnState(balancer.ClientConnState{ - ResolverState: s.ResolverState, + ResolverState: newState, BalancerConfig: parsedCfg, }) diff --git a/internal/xds/xds.go b/internal/xds/xds.go index 4d5a85ef35a8..7d43462e808e 100644 --- a/internal/xds/xds.go +++ b/internal/xds/xds.go @@ -46,6 +46,17 @@ func GetXDSHandshakeClusterName(attr *attributes.Attributes) (string, bool) { return name, ok } +// AddressToTelemetryLabels prepares a telemetry label map from resolver +// address atrributes. +func AddressToTelemetryLabels(addr resolver.Address) map[string]string { + cluster, _ := GetXDSHandshakeClusterName(addr.Attributes) + locality := LocalityString(GetLocalityID(addr)) + labels := make(map[string]string) + labels["grpc.lb.locality"] = locality + labels["grpc.lb.backend_service"] = cluster + return labels +} + // LocalityString generates a string representation of clients.Locality in the // format specified in gRFC A76. func LocalityString(l clients.Locality) string { diff --git a/resolver/resolver.go b/resolver/resolver.go index 8e6af9514b6d..7a9d339563f9 100644 --- a/resolver/resolver.go +++ b/resolver/resolver.go @@ -356,3 +356,19 @@ func ValidateEndpoints(endpoints []Endpoint) error { } return errors.New("endpoints list contains no addresses") } + +type backendServiceKey struct{} + +// SetBackendServiceOnState stores the backendService on the resolver state so +// that it can be retrieved in UpdateClientConnState in wrr for metrics +func SetBackendServiceOnState(state State, backendService string) State { + state.Attributes = state.Attributes.WithValue(backendServiceKey{}, backendService) + return state +} + +// GetBackendServiceFromState provides cluster name saved in resolver state +func GetBackendServiceFromState(state State) (string, bool) { + v := state.Attributes.Value(backendServiceKey{}) + name, ok := v.(string) + return name, ok +} diff --git a/stats/opentelemetry/csm/observability_test.go b/stats/opentelemetry/csm/observability_test.go index 4057a1fd85a5..09d450cbc155 100644 --- a/stats/opentelemetry/csm/observability_test.go +++ b/stats/opentelemetry/csm/observability_test.go @@ -434,7 +434,8 @@ func unaryInterceptorAttachXDSLabels(ctx context.Context, method string, req, re "csm.service_name": "service_name_val", "csm.service_namespace_name": "service_namespace_val", - "grpc.lb.locality": "grpc.lb.locality_val", + "grpc.lb.locality": "grpc.lb.locality_val", + "grpc.lb.backend_service": "grpc.lb.backend_service_val", }, }) @@ -469,7 +470,7 @@ func (s) TestXDSLabels(t *testing.T) { MetricsOptions: opentelemetry.MetricsOptions{ MeterProvider: provider, Metrics: opentelemetry.DefaultMetrics(), - OptionalLabels: []string{"csm.service_name", "csm.service_namespace_name", "grpc.lb.locality"}, + OptionalLabels: []string{"csm.service_name", "csm.service_namespace_name", "grpc.lb.locality", "grpc.lb.backend_service"}, }, }, po), grpc.WithUnaryInterceptor(unaryInterceptorAttachXDSLabels)} if err := ss.Start(nil, dopts...); err != nil { @@ -498,6 +499,7 @@ func (s) TestXDSLabels(t *testing.T) { serviceNameAttr := attribute.String("csm.service_name", "service_name_val") serviceNamespaceAttr := attribute.String("csm.service_namespace_name", "service_namespace_val") localityAttr := attribute.String("grpc.lb.locality", "grpc.lb.locality_val") + backendServiceAttr := attribute.String("grpc.lb.backend_service", "grpc.lb.backend_service_val") meshIDAttr := attribute.String("csm.mesh_id", "unknown") workloadCanonicalServiceAttr := attribute.String("csm.workload_canonical_service", "unknown") remoteWorkloadTypeAttr := attribute.String("csm.remote_workload_type", "unknown") @@ -510,6 +512,7 @@ func (s) TestXDSLabels(t *testing.T) { serviceNameAttr, serviceNamespaceAttr, localityAttr, + backendServiceAttr, meshIDAttr, workloadCanonicalServiceAttr, remoteWorkloadTypeAttr, diff --git a/stats/opentelemetry/e2e_test.go b/stats/opentelemetry/e2e_test.go index 2d575bfe06c5..68b48df73e30 100644 --- a/stats/opentelemetry/e2e_test.go +++ b/stats/opentelemetry/e2e_test.go @@ -673,7 +673,7 @@ func (s) TestWRRMetrics(t *testing.T) { mo := opentelemetry.MetricsOptions{ MeterProvider: provider, Metrics: opentelemetry.DefaultMetrics().Add("grpc.lb.wrr.rr_fallback", "grpc.lb.wrr.endpoint_weight_not_yet_usable", "grpc.lb.wrr.endpoint_weight_stale", "grpc.lb.wrr.endpoint_weights"), - OptionalLabels: []string{"grpc.lb.locality"}, + OptionalLabels: []string{"grpc.lb.locality", "grpc.lb.backend_service"}, } target := fmt.Sprintf("xds:///%s", serviceName) @@ -699,6 +699,7 @@ func (s) TestWRRMetrics(t *testing.T) { targetAttr := attribute.String("grpc.target", target) localityAttr := attribute.String("grpc.lb.locality", `{region="region-1", zone="zone-1", sub_zone="subzone-1"}`) + backendServiceAttr := attribute.String("grpc.lb.backend_service", clusterName) wantMetrics := []metricdata.Metrics{ { @@ -708,7 +709,7 @@ func (s) TestWRRMetrics(t *testing.T) { Data: metricdata.Sum[int64]{ DataPoints: []metricdata.DataPoint[int64]{ { - Attributes: attribute.NewSet(targetAttr, localityAttr), + Attributes: attribute.NewSet(targetAttr, localityAttr, backendServiceAttr), Value: 1, // value ignored }, }, @@ -724,7 +725,7 @@ func (s) TestWRRMetrics(t *testing.T) { Data: metricdata.Sum[int64]{ DataPoints: []metricdata.DataPoint[int64]{ { - Attributes: attribute.NewSet(targetAttr, localityAttr), + Attributes: attribute.NewSet(targetAttr, localityAttr, backendServiceAttr), Value: 1, // value ignored }, }, @@ -739,7 +740,7 @@ func (s) TestWRRMetrics(t *testing.T) { Data: metricdata.Histogram[float64]{ DataPoints: []metricdata.HistogramDataPoint[float64]{ { - Attributes: attribute.NewSet(targetAttr, localityAttr), + Attributes: attribute.NewSet(targetAttr, localityAttr, backendServiceAttr), }, }, Temporality: metricdata.CumulativeTemporality, @@ -761,7 +762,7 @@ func (s) TestWRRMetrics(t *testing.T) { Data: metricdata.Sum[int64]{ DataPoints: []metricdata.DataPoint[int64]{ { - Attributes: attribute.NewSet(targetAttr, localityAttr), + Attributes: attribute.NewSet(targetAttr, localityAttr, backendServiceAttr), Value: 1, // value ignored }, }, @@ -775,6 +776,212 @@ func (s) TestWRRMetrics(t *testing.T) { } } +func (s) TestSubChannelMetrics(t *testing.T) { + cmr := orca.NewServerMetricsRecorder().(orca.CallMetricsRecorder) + backend1 := stubserver.StartTestService(t, &stubserver.StubServer{ + EmptyCallF: func(ctx context.Context, _ *testpb.Empty) (*testpb.Empty, error) { + if r := orca.CallMetricsRecorderFromContext(ctx); r != nil { + // Copy metrics from what the test set in cmr into r. + sm := cmr.(orca.ServerMetricsProvider).ServerMetrics() + r.SetApplicationUtilization(sm.AppUtilization) + r.SetQPS(sm.QPS) + r.SetEPS(sm.EPS) + } + return &testpb.Empty{}, nil + }, + }, orca.CallMetricsServerOption(nil)) + port1 := itestutils.ParsePort(t, backend1.Address) + + cmr.SetQPS(10.0) + cmr.SetApplicationUtilization(1.0) + + backend2 := stubserver.StartTestService(t, &stubserver.StubServer{ + EmptyCallF: func(ctx context.Context, _ *testpb.Empty) (*testpb.Empty, error) { + if r := orca.CallMetricsRecorderFromContext(ctx); r != nil { + // Copy metrics from what the test set in cmr into r. + sm := cmr.(orca.ServerMetricsProvider).ServerMetrics() + r.SetApplicationUtilization(sm.AppUtilization) + r.SetQPS(sm.QPS) + r.SetEPS(sm.EPS) + } + return &testpb.Empty{}, nil + }, + }, orca.CallMetricsServerOption(nil)) + port2 := itestutils.ParsePort(t, backend2.Address) + defer backend2.Stop() + const serviceName = "my-service-client-side-xds" + + // Start an xDS management server. + managementServer, nodeID, _, xdsResolver := setup.ManagementServerAndResolver(t) + + wrrConfig := &v3wrrlocalitypb.WrrLocality{ + EndpointPickingPolicy: &v3clusterpb.LoadBalancingPolicy{ + Policies: []*v3clusterpb.LoadBalancingPolicy_Policy{ + { + TypedExtensionConfig: &v3corepb.TypedExtensionConfig{ + TypedConfig: itestutils.MarshalAny(t, &v3clientsideweightedroundrobinpb.ClientSideWeightedRoundRobin{ + EnableOobLoadReport: &wrapperspb.BoolValue{ + Value: false, + }, + // BlackoutPeriod long enough to cause load report + // weight to trigger in the scope of test case. + // WeightExpirationPeriod will cause the load report + // weight for backend 1 to expire. + BlackoutPeriod: durationpb.New(5 * time.Millisecond), + WeightExpirationPeriod: durationpb.New(500 * time.Millisecond), + WeightUpdatePeriod: durationpb.New(time.Second), + ErrorUtilizationPenalty: &wrapperspb.FloatValue{Value: 1}, + }), + }, + }, + }, + }, + } + + routeConfigName := "route-" + serviceName + clusterName := "cluster-" + serviceName + endpointsName := "endpoints-" + serviceName + resources := e2e.UpdateOptions{ + NodeID: nodeID, + Listeners: []*v3listenerpb.Listener{e2e.DefaultClientListener(serviceName, routeConfigName)}, + Routes: []*v3routepb.RouteConfiguration{e2e.DefaultRouteConfig(routeConfigName, serviceName, clusterName)}, + Clusters: []*v3clusterpb.Cluster{clusterWithLBConfiguration(t, clusterName, endpointsName, e2e.SecurityLevelNone, wrrConfig)}, + Endpoints: []*v3endpointpb.ClusterLoadAssignment{e2e.EndpointResourceWithOptions(e2e.EndpointOptions{ + ClusterName: endpointsName, + Host: "localhost", + Localities: []e2e.LocalityOptions{ + { + Backends: []e2e.BackendOptions{{Ports: []uint32{port1}}, {Ports: []uint32{port2}}}, + Weight: 1, + }, + }, + })}, + } + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + if err := managementServer.Update(ctx, resources); err != nil { + t.Fatal(err) + } + + reader := metric.NewManualReader() + provider := metric.NewMeterProvider(metric.WithReader(reader)) + + mo := opentelemetry.MetricsOptions{ + MeterProvider: provider, + Metrics: opentelemetry.DefaultMetrics().Add( + "grpc.subchannel.connection_attempts_succeeded", + "grpc.subchannel.open_connections", + "grpc.subchannel.disconnections", + "grpc.subchannel.connection_attempts_failed"), + OptionalLabels: []string{ + "grpc.lb.locality", + "grpc.lb.backend_service", + "grpc.security_level", + "grpc.disconnect_error"}, + } + + target := fmt.Sprintf("xds:///%s", serviceName) + cc, err := grpc.NewClient(target, grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(xdsResolver), opentelemetry.DialOption(opentelemetry.Options{MetricsOptions: mo})) + if err != nil { + t.Fatalf("Failed to dial local test server: %v", err) + } + defer cc.Close() + client := testgrpc.NewTestServiceClient(cc) + + // Make 100 RPC's. The two backends will send back load reports per call + // giving the two SubChannels weights which will eventually expire. Two + // backends needed as for only one backend, WRR does not recompute the + // scheduler. + receivedExpectedMetrics := grpcsync.NewEvent() + go func() { + for !receivedExpectedMetrics.HasFired() && ctx.Err() == nil { + client.EmptyCall(ctx, &testpb.Empty{}) + time.Sleep(2 * time.Millisecond) + } + }() + + targetAttr := attribute.String("grpc.target", target) + localityAttr := attribute.String("grpc.lb.locality", `{region="region-1", zone="zone-1", sub_zone="subzone-1"}`) + backendServiceAttr := attribute.String("grpc.lb.backend_service", clusterName) + disconnectionReasonAttr := attribute.String("grpc.disconnect_error", "unknown") + securityLevelAttr := attribute.String("grpc.security_level", "NoSecurity") + + wantMetrics := []metricdata.Metrics{ + { + Name: "grpc.subchannel.connection_attempts_succeeded", + Description: "EXPERIMENTAL. Number of successful connection attempts.", + Unit: "{attempt}", + Data: metricdata.Sum[int64]{ + DataPoints: []metricdata.DataPoint[int64]{ + { + Attributes: attribute.NewSet(targetAttr, backendServiceAttr, localityAttr), + Value: 1, // value ignored + }, + }, + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + }, + }, + { + Name: "grpc.subchannel.open_connections", + Description: "EXPERIMENTAL. Number of open connections.", + Unit: "{attempt}", + Data: metricdata.Sum[int64]{ + DataPoints: []metricdata.DataPoint[int64]{ + { + Attributes: attribute.NewSet(targetAttr, backendServiceAttr, securityLevelAttr, localityAttr), + Value: 1, // value ignored + }, + }, + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: false, + }, + }, + } + if err := pollForWantMetrics(ctx, t, reader, wantMetrics); err != nil { + t.Fatal(err) + } + backend1.Stop() + receivedExpectedMetrics.Fire() + disconnectionWantMetrics := []metricdata.Metrics{ + { + Name: "grpc.subchannel.disconnections", + Description: "EXPERIMENTAL. Number of times the selected subchannel becomes disconnected.", + Unit: "{disconnection}", + Data: metricdata.Sum[int64]{ + DataPoints: []metricdata.DataPoint[int64]{ + { + Attributes: attribute.NewSet(targetAttr, backendServiceAttr, localityAttr, disconnectionReasonAttr), + Value: 1, + }, + }, + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + }, + }, + { + Name: "grpc.subchannel.connection_attempts_failed", + Description: "EXPERIMENTAL. Number of failed connection attempts.", + Unit: "{attempt}", + Data: metricdata.Sum[int64]{ + DataPoints: []metricdata.DataPoint[int64]{ + { + Attributes: attribute.NewSet(targetAttr, backendServiceAttr, localityAttr), + Value: 1, // value ignored + }, + }, + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + }, + }, + } + + if err := pollForWantMetrics(ctx, t, reader, disconnectionWantMetrics); err != nil { + t.Fatal(err) + } +} + // pollForWantMetrics polls for the wantMetrics to show up on reader. Returns an // error if metric is present but not equal to expected, or if the wantMetrics // do not show up during the context timeout.