From b866b9ceba1983c403010ad664a03d1f1e070b6b Mon Sep 17 00:00:00 2001 From: eshitachandwani Date: Mon, 17 Nov 2025 08:33:31 +0530 Subject: [PATCH 1/9] change xds_resolver to use dependency manager --- internal/xds/resolver/helpers_test.go | 20 -- internal/xds/resolver/watch_service.go | 92 ------ internal/xds/resolver/xds_resolver.go | 268 ++++-------------- internal/xds/resolver/xds_resolver_test.go | 64 ----- .../xds/xdsdepmgr/xds_dependency_manager.go | 187 ++++++------ 5 files changed, 136 insertions(+), 495 deletions(-) delete mode 100644 internal/xds/resolver/watch_service.go diff --git a/internal/xds/resolver/helpers_test.go b/internal/xds/resolver/helpers_test.go index 243bbfd99f45..174b03883445 100644 --- a/internal/xds/resolver/helpers_test.go +++ b/internal/xds/resolver/helpers_test.go @@ -191,26 +191,6 @@ func verifyNoUpdateFromResolver(ctx context.Context, t *testing.T, stateCh chan } } -// waitForErrorFromResolver waits for the resolver to push an error and verifies -// that it matches the expected error and contains the expected node ID. -func waitForErrorFromResolver(ctx context.Context, errCh chan error, wantErr, wantNodeID string) error { - select { - case <-ctx.Done(): - return fmt.Errorf("timeout when waiting for error to be propagated to the ClientConn") - case gotErr := <-errCh: - if gotErr == nil { - return fmt.Errorf("got nil error from resolver, want %q", wantErr) - } - if !strings.Contains(gotErr.Error(), wantErr) { - return fmt.Errorf("got error from resolver %q, want %q", gotErr, wantErr) - } - if !strings.Contains(gotErr.Error(), wantNodeID) { - return fmt.Errorf("got error from resolver %q, want nodeID %q", gotErr, wantNodeID) - } - } - return nil -} - func verifyResolverError(gotErr error, wantCode codes.Code, wantErr, wantNodeID string) error { if gotErr == nil { return fmt.Errorf("got nil error from resolver, want error with code %v", wantCode) diff --git a/internal/xds/resolver/watch_service.go b/internal/xds/resolver/watch_service.go deleted file mode 100644 index 3fb122b001c0..000000000000 --- a/internal/xds/resolver/watch_service.go +++ /dev/null @@ -1,92 +0,0 @@ -/* - * - * Copyright 2020 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package resolver - -import ( - "context" - - "google.golang.org/grpc/internal/xds/xdsclient/xdsresource" -) - -type listenerWatcher struct { - resourceName string - cancel func() - parent *xdsResolver -} - -func newListenerWatcher(resourceName string, parent *xdsResolver) *listenerWatcher { - lw := &listenerWatcher{resourceName: resourceName, parent: parent} - lw.cancel = xdsresource.WatchListener(parent.xdsClient, resourceName, lw) - return lw -} - -func (l *listenerWatcher) ResourceChanged(update *xdsresource.ListenerUpdate, onDone func()) { - handleUpdate := func(context.Context) { l.parent.onListenerResourceUpdate(update); onDone() } - l.parent.serializer.ScheduleOr(handleUpdate, onDone) -} - -func (l *listenerWatcher) ResourceError(err error, onDone func()) { - handleError := func(context.Context) { l.parent.onListenerResourceError(err); onDone() } - l.parent.serializer.ScheduleOr(handleError, onDone) -} - -func (l *listenerWatcher) AmbientError(err error, onDone func()) { - handleError := func(context.Context) { l.parent.onListenerResourceAmbientError(err); onDone() } - l.parent.serializer.ScheduleOr(handleError, onDone) -} - -func (l *listenerWatcher) stop() { - l.cancel() - l.parent.logger.Infof("Canceling watch on Listener resource %q", l.resourceName) -} - -type routeConfigWatcher struct { - resourceName string - cancel func() - parent *xdsResolver -} - -func newRouteConfigWatcher(resourceName string, parent *xdsResolver) *routeConfigWatcher { - rw := &routeConfigWatcher{resourceName: resourceName, parent: parent} - rw.cancel = xdsresource.WatchRouteConfig(parent.xdsClient, resourceName, rw) - return rw -} - -func (r *routeConfigWatcher) ResourceChanged(u *xdsresource.RouteConfigUpdate, onDone func()) { - handleUpdate := func(context.Context) { - r.parent.onRouteConfigResourceUpdate(r.resourceName, u) - onDone() - } - r.parent.serializer.ScheduleOr(handleUpdate, onDone) -} - -func (r *routeConfigWatcher) ResourceError(err error, onDone func()) { - handleError := func(context.Context) { r.parent.onRouteConfigResourceError(r.resourceName, err); onDone() } - r.parent.serializer.ScheduleOr(handleError, onDone) -} - -func (r *routeConfigWatcher) AmbientError(err error, onDone func()) { - handleError := func(context.Context) { r.parent.onRouteConfigResourceAmbientError(r.resourceName, err); onDone() } - r.parent.serializer.ScheduleOr(handleError, onDone) -} - -func (r *routeConfigWatcher) stop() { - r.cancel() - r.parent.logger.Infof("Canceling watch on RouteConfiguration resource %q", r.resourceName) -} diff --git a/internal/xds/resolver/xds_resolver.go b/internal/xds/resolver/xds_resolver.go index c28892a6f28c..7df74dd4c02d 100644 --- a/internal/xds/resolver/xds_resolver.go +++ b/internal/xds/resolver/xds_resolver.go @@ -36,6 +36,7 @@ import ( rinternal "google.golang.org/grpc/internal/xds/resolver/internal" "google.golang.org/grpc/internal/xds/xdsclient" "google.golang.org/grpc/internal/xds/xdsclient/xdsresource" + "google.golang.org/grpc/internal/xds/xdsdepmgr" "google.golang.org/grpc/resolver" ) @@ -152,9 +153,8 @@ func (b *xdsResolverBuilder) Build(target resolver.Target, cc resolver.ClientCon if err != nil { return nil, err } - r.dataplaneAuthority = opts.Authority r.ldsResourceName = bootstrap.PopulateResourceTemplate(template, target.Endpoint()) - r.listenerWatcher = newListenerWatcher(r.ldsResourceName, r) + r.dm = xdsdepmgr.New(r.ldsResourceName, opts.Authority, r.xdsClient, r) return r, nil } @@ -217,33 +217,13 @@ type xdsResolver struct { // A random number which uniquely identifies the channel which owns this // resolver. channelID uint64 + dm *xdsdepmgr.DependencyManager - // All methods on the xdsResolver type except for the ones invoked by gRPC, - // i.e ResolveNow() and Close(), are guaranteed to execute in the context of - // this serializer's callback. And since the serializer guarantees mutual - // exclusion among these callbacks, we can get by without any mutexes to - // access all of the below defined state. The only exception is Close(), - // which does access some of this shared state, but it does so after - // cancelling the context passed to the serializer. serializer *grpcsync.CallbackSerializer serializerCancel context.CancelFunc - // dataplaneAuthority is the authority used for the data plane connections, - // which is also used to select the VirtualHost within the xDS - // RouteConfiguration. This is %-encoded to match with VirtualHost Domain - // in xDS RouteConfiguration. - dataplaneAuthority string - - ldsResourceName string - listenerWatcher *listenerWatcher - listenerUpdateRecvd bool - currentListener *xdsresource.ListenerUpdate - - rdsResourceName string - routeConfigWatcher *routeConfigWatcher - routeConfigUpdateRecvd bool - currentRouteConfig *xdsresource.RouteConfigUpdate - currentVirtualHost *xdsresource.VirtualHost // Matched virtual host for quick access. + ldsResourceName string + rdsResourceName string // activeClusters is a map from cluster name to information about the // cluster that includes a ref count and load balancing configuration. @@ -262,22 +242,53 @@ func (r *xdsResolver) Close() { r.serializerCancel() <-r.serializer.Done() - // Note that Close needs to check for nils even if some of them are always - // set in the constructor. This is because the constructor defers Close() in - // error cases, and the fields might not be set when the error happens. - - if r.listenerWatcher != nil { - r.listenerWatcher.stop() - } - if r.routeConfigWatcher != nil { - r.routeConfigWatcher.stop() + if r.dm != nil { + r.dm.Close() } + if r.xdsClientClose != nil { r.xdsClientClose() } r.logger.Infof("Shutdown") } +// Update is called when there is a new xDS config available from the dependency +// manager and does the following: +// - creates a new config selector (this involves incrementing references to +// clusters owned by this config selector). +// - stops the old config selector (this involves decrementing references to +// clusters owned by this config selector). +// - prunes active clusters and pushes a new service config to the channel. +// - updates the current config selector used by the resolver. +func (r *xdsResolver) Update(config *xdsresource.XDSConfig) { + r.serializer.TrySchedule(func(context.Context) { + r.rdsResourceName = config.Listener.RouteConfigName + cs, err := r.newConfigSelector(config) + if err != nil { + r.onResourceError(err) + return + } + if !r.sendNewServiceConfig(cs) { + // Channel didn't like the update we provided (unexpected); erase + // this config selector and ignore this update, continuing with + // the previous config selector. + cs.stop() + return + } + + if r.curConfigSelector != nil { + r.curConfigSelector.stop() + } + r.curConfigSelector = cs + }) +} + +func (r *xdsResolver) Error(err error) { + r.serializer.TrySchedule(func(context.Context) { + r.onResourceError(err) + }) +} + // sendNewServiceConfig prunes active clusters, generates a new service config // based on the current set of active clusters, and sends an update to the // channel with that service config and the provided config selector. Returns @@ -328,7 +339,7 @@ func (r *xdsResolver) sendNewServiceConfig(cs stoppableConfigSelector) bool { // r.activeClusters for previously-unseen clusters. // // Only executed in the context of a serializer callback. -func (r *xdsResolver) newConfigSelector() (*configSelector, error) { +func (r *xdsResolver) newConfigSelector(config *xdsresource.XDSConfig) (*configSelector, error) { cs := &configSelector{ channelID: r.channelID, xdsNodeID: r.xdsClient.BootstrapConfig().Node().GetId(), @@ -338,25 +349,25 @@ func (r *xdsResolver) newConfigSelector() (*configSelector, error) { }) }, virtualHost: virtualHost{ - retryConfig: r.currentVirtualHost.RetryConfig, + retryConfig: config.VirtualHost.RetryConfig, }, - routes: make([]route, len(r.currentVirtualHost.Routes)), + routes: make([]route, len(config.VirtualHost.Routes)), clusters: make(map[string]*clusterInfo), - httpFilterConfig: r.currentListener.HTTPFilters, + httpFilterConfig: config.Listener.HTTPFilters, } - for i, rt := range r.currentVirtualHost.Routes { + for i, rt := range config.VirtualHost.Routes { clusters := rinternal.NewWRR.(func() wrr.WRR)() if rt.ClusterSpecifierPlugin != "" { clusterName := clusterSpecifierPluginPrefix + rt.ClusterSpecifierPlugin clusters.Add(&routeCluster{name: clusterName}, 1) ci := r.addOrGetActiveClusterInfo(clusterName) - ci.cfg = xdsChildConfig{ChildPolicy: balancerConfig(r.currentRouteConfig.ClusterSpecifierPlugins[rt.ClusterSpecifierPlugin])} + ci.cfg = xdsChildConfig{ChildPolicy: balancerConfig(config.RouteConfig.ClusterSpecifierPlugins[rt.ClusterSpecifierPlugin])} cs.clusters[clusterName] = ci } else { for _, wc := range rt.WeightedClusters { clusterName := clusterPrefix + wc.Name - interceptor, err := newInterceptor(r.currentListener.HTTPFilters, wc.HTTPFilterConfigOverride, rt.HTTPFilterConfigOverride, r.currentVirtualHost.HTTPFilterConfigOverride) + interceptor, err := newInterceptor(config.Listener.HTTPFilters, wc.HTTPFilterConfigOverride, rt.HTTPFilterConfigOverride, config.VirtualHost.HTTPFilterConfigOverride) if err != nil { return nil, err } @@ -374,7 +385,7 @@ func (r *xdsResolver) newConfigSelector() (*configSelector, error) { cs.routes[i].m = xdsresource.RouteToMatcher(rt) cs.routes[i].actionType = rt.ActionType if rt.MaxStreamDuration == nil { - cs.routes[i].maxStreamDuration = r.currentListener.MaxStreamDuration + cs.routes[i].maxStreamDuration = config.Listener.MaxStreamDuration } else { cs.routes[i].maxStreamDuration = *rt.MaxStreamDuration } @@ -422,75 +433,6 @@ type clusterInfo struct { cfg xdsChildConfig } -// Determines if the xdsResolver has received all required configuration, i.e -// Listener and RouteConfiguration resources, from the management server, and -// whether a matching virtual host was found in the RouteConfiguration resource. -func (r *xdsResolver) resolutionComplete() bool { - return r.listenerUpdateRecvd && r.routeConfigUpdateRecvd && r.currentVirtualHost != nil -} - -// onResolutionComplete performs the following actions when resolution is -// complete, i.e Listener and RouteConfiguration resources have been received -// from the management server and a matching virtual host is found in the -// latter. -// - creates a new config selector (this involves incrementing references to -// clusters owned by this config selector). -// - stops the old config selector (this involves decrementing references to -// clusters owned by this config selector). -// - prunes active clusters and pushes a new service config to the channel. -// - updates the current config selector used by the resolver. -// -// Only executed in the context of a serializer callback. -func (r *xdsResolver) onResolutionComplete() { - if !r.resolutionComplete() { - return - } - - cs, err := r.newConfigSelector() - if err != nil { - // Send an erroring config selector in this case that fails RPCs. - r.onResourceError(fmt.Errorf("xds: failed to create config selector: %v", err)) - return - } - if !r.sendNewServiceConfig(cs) { - // Channel didn't like the update we provided (unexpected); erase - // this config selector and ignore this update, continuing with - // the previous config selector. - cs.stop() - return - } - - if r.curConfigSelector != nil { - r.curConfigSelector.stop() - } - r.curConfigSelector = cs -} - -func (r *xdsResolver) applyRouteConfigUpdate(update *xdsresource.RouteConfigUpdate) { - matchVh := xdsresource.FindBestMatchingVirtualHost(r.dataplaneAuthority, update.VirtualHosts) - if matchVh == nil { - // TODO(purnesh42h): Should this be a resource or ambient error? Note - // that its being called only from resource update methods when we have - // finished removing the previous update. - r.onAmbientError(fmt.Errorf("no matching virtual host found for %q", r.dataplaneAuthority)) - return - } - r.currentRouteConfig = update - r.currentVirtualHost = matchVh - r.routeConfigUpdateRecvd = true - - r.onResolutionComplete() -} - -// onAmbientError propagates the error up to the channel. And since this is -// invoked only for non resource errors, we don't have to update resolver -// state and we can keep using the old config. -// -// Only executed in the context of a serializer callback. -func (r *xdsResolver) onAmbientError(err error) { - r.cc.ReportError(err) -} - // Contains common functionality to be executed when resources of either type // are removed. // @@ -513,107 +455,3 @@ func (r *xdsResolver) onResourceError(err error) { } r.curConfigSelector = cs } - -// Only executed in the context of a serializer callback. -func (r *xdsResolver) onListenerResourceUpdate(update *xdsresource.ListenerUpdate) { - if r.logger.V(2) { - r.logger.Infof("Received update for Listener resource %q: %v", r.ldsResourceName, pretty.ToJSON(update)) - } - - r.currentListener = update - r.listenerUpdateRecvd = true - - if update.InlineRouteConfig != nil { - // If there was a previous route config watcher because of a non-inline - // route configuration, cancel it. - r.rdsResourceName = "" - if r.routeConfigWatcher != nil { - r.routeConfigWatcher.stop() - r.routeConfigWatcher = nil - } - - r.applyRouteConfigUpdate(update.InlineRouteConfig) - return - } - - // We get here only if there was no inline route configuration. - - // If the route config name has not changed, send an update with existing - // route configuration and the newly received listener configuration. - if r.rdsResourceName == update.RouteConfigName { - r.onResolutionComplete() - return - } - - // If the route config name has changed, cancel the old watcher and start a - // new one. At this point, since we have not yet resolved the new route - // config name, we don't send an update to the channel, and therefore - // continue using the old route configuration (if received) until the new - // one is received. - r.rdsResourceName = update.RouteConfigName - if r.routeConfigWatcher != nil { - r.routeConfigWatcher.stop() - r.currentVirtualHost = nil - r.routeConfigUpdateRecvd = false - } - r.routeConfigWatcher = newRouteConfigWatcher(r.rdsResourceName, r) -} - -func (r *xdsResolver) onListenerResourceAmbientError(err error) { - if r.logger.V(2) { - r.logger.Infof("Received ambient error for Listener resource %q: %v", r.ldsResourceName, err) - } - r.onAmbientError(err) -} - -// Only executed in the context of a serializer callback. -func (r *xdsResolver) onListenerResourceError(err error) { - if r.logger.V(2) { - r.logger.Infof("Received resource error for Listener resource %q: %v", r.ldsResourceName, err) - } - - r.listenerUpdateRecvd = false - if r.routeConfigWatcher != nil { - r.routeConfigWatcher.stop() - } - r.rdsResourceName = "" - r.currentVirtualHost = nil - r.routeConfigUpdateRecvd = false - r.routeConfigWatcher = nil - - r.onResourceError(err) -} - -// Only executed in the context of a serializer callback. -func (r *xdsResolver) onRouteConfigResourceUpdate(name string, update *xdsresource.RouteConfigUpdate) { - if r.logger.V(2) { - r.logger.Infof("Received update for RouteConfiguration resource %q: %v", name, pretty.ToJSON(update)) - } - - if r.rdsResourceName != name { - // Drop updates from canceled watchers. - return - } - - r.applyRouteConfigUpdate(update) -} - -// Only executed in the context of a serializer callback. -func (r *xdsResolver) onRouteConfigResourceAmbientError(name string, err error) { - if r.logger.V(2) { - r.logger.Infof("Received ambient error for RouteConfiguration resource %q: %v", name, err) - } - r.onAmbientError(err) -} - -// Only executed in the context of a serializer callback. -func (r *xdsResolver) onRouteConfigResourceError(name string, err error) { - if r.logger.V(2) { - r.logger.Infof("Received resource error for RouteConfiguration resource %q: %v", name, err) - } - - if r.rdsResourceName != name { - return - } - r.onResourceError(err) -} diff --git a/internal/xds/resolver/xds_resolver_test.go b/internal/xds/resolver/xds_resolver_test.go index eaf2252fd65b..4818c28181c4 100644 --- a/internal/xds/resolver/xds_resolver_test.go +++ b/internal/xds/resolver/xds_resolver_test.go @@ -359,70 +359,6 @@ func (s) TestResolverBadServiceUpdate_NACKedWithoutCache(t *testing.T) { } } -// Tests the case where a resource, present in cache, returned by the -// management server is NACKed by the xDS client, which then returns -// an update containing an ambient error to the resolver. Verifies that the -// update is propagated to the ClientConn by the resolver. It tests the -// case where the resolver gets a good update first, and an error -// after the good update. The test also verifies that these are propagated to -// the ClientConn and that RPC succeeds as expected after receiving good update -// as well as ambient error. -func (s) TestResolverBadServiceUpdate_NACKedWithCache(t *testing.T) { - // Spin up an xDS management server for the test. - ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) - defer cancel() - nodeID := uuid.New().String() - mgmtServer, _, _, bc := setupManagementServerForTest(t, nodeID) - - stateCh, errCh, _ := buildResolverForTarget(t, resolver.Target{URL: *testutils.MustParseURL("xds:///" + defaultTestServiceName)}, bc) - - // Configure good listener and route configuration resources on the - // management server. - listeners := []*v3listenerpb.Listener{e2e.DefaultClientListener(defaultTestServiceName, defaultTestRouteConfigName)} - routes := []*v3routepb.RouteConfiguration{e2e.DefaultRouteConfig(defaultTestRouteConfigName, defaultTestServiceName, defaultTestClusterName)} - configureResourcesOnManagementServer(ctx, t, mgmtServer, nodeID, listeners, routes) - - // Expect a good update from the resolver. - cs := verifyUpdateFromResolver(ctx, t, stateCh, wantServiceConfig(defaultTestClusterName)) - - // "Make an RPC" by invoking the config selector. - _, err := cs.SelectConfig(iresolver.RPCInfo{Context: ctx, Method: "/service/method"}) - if err != nil { - t.Fatalf("cs.SelectConfig(): %v", err) - } - - // Configure a listener resource that is expected to be NACKed because it - // does not contain the `RouteSpecifier` field in the HTTPConnectionManager. - hcm := testutils.MarshalAny(t, &v3httppb.HttpConnectionManager{ - HttpFilters: []*v3httppb.HttpFilter{e2e.HTTPFilter("router", &v3routerpb.Router{})}, - }) - lis := &v3listenerpb.Listener{ - Name: defaultTestServiceName, - ApiListener: &v3listenerpb.ApiListener{ApiListener: hcm}, - FilterChains: []*v3listenerpb.FilterChain{{ - Name: "filter-chain-name", - Filters: []*v3listenerpb.Filter{{ - Name: wellknown.HTTPConnectionManager, - ConfigType: &v3listenerpb.Filter_TypedConfig{TypedConfig: hcm}, - }}, - }}, - } - - // Expect an error update from the resolver. Since the resource is cached, - // it should be received as an ambient error. - configureResourcesOnManagementServer(ctx, t, mgmtServer, nodeID, []*v3listenerpb.Listener{lis}, nil) - if err := waitForErrorFromResolver(ctx, errCh, "no RouteSpecifier", nodeID); err != nil { - t.Fatal(err) - } - - // "Make an RPC" by invoking the config selector which should succeed by - // continuing to use the previously cached resource. - _, err = cs.SelectConfig(iresolver.RPCInfo{Context: ctx, Method: "/service/method"}) - if err != nil { - t.Fatalf("cs.SelectConfig(): %v", err) - } -} - // TestResolverGoodServiceUpdate tests the case where the resource returned by // the management server is ACKed by the xDS client, which then returns a good // service update to the resolver. The test verifies that the service config diff --git a/internal/xds/xdsdepmgr/xds_dependency_manager.go b/internal/xds/xdsdepmgr/xds_dependency_manager.go index fae5ca962d42..e06d0b1ea185 100644 --- a/internal/xds/xdsdepmgr/xds_dependency_manager.go +++ b/internal/xds/xdsdepmgr/xds_dependency_manager.go @@ -19,11 +19,12 @@ package xdsdepmgr import ( + "context" "fmt" - "sync" "google.golang.org/grpc/grpclog" internalgrpclog "google.golang.org/grpc/internal/grpclog" + "google.golang.org/grpc/internal/grpcsync" "google.golang.org/grpc/internal/xds/xdsclient" "google.golang.org/grpc/internal/xds/xdsclient/xdsresource" ) @@ -69,9 +70,9 @@ type DependencyManager struct { dataplaneAuthority string nodeID string - // All the fields below are protected by mu. - mu sync.Mutex - stopped bool + // All the fields below are accessed only from the callback serializer. + serializer *grpcsync.CallbackSerializer + serializerCancel context.CancelFunc listenerWatcher *listenerWatcher currentListenerUpdate *xdsresource.ListenerUpdate @@ -99,6 +100,9 @@ func New(listenerName, dataplaneAuthority string, xdsClient xdsclient.XDSClient, nodeID: xdsClient.BootstrapConfig().Node().GetId(), } dm.logger = prefixLogger(dm) + ctx, cancel := context.WithCancel(context.Background()) + dm.serializer = grpcsync.NewCallbackSerializer(ctx) + dm.serializerCancel = cancel // Start the listener watch. Listener watch will start the other resource // watches as needed. @@ -108,14 +112,9 @@ func New(listenerName, dataplaneAuthority string, xdsClient xdsclient.XDSClient, // Close cancels all registered resource watches. func (m *DependencyManager) Close() { - m.mu.Lock() - defer m.mu.Unlock() + m.serializerCancel() + <-m.serializer.Done() - if m.stopped { - return - } - - m.stopped = true if m.listenerWatcher != nil { m.listenerWatcher.stop() } @@ -153,70 +152,60 @@ func (m *DependencyManager) applyRouteConfigUpdateLocked(update *xdsresource.Rou } func (m *DependencyManager) onListenerResourceUpdate(update *xdsresource.ListenerUpdate, onDone func()) { - m.mu.Lock() - defer m.mu.Unlock() - - defer onDone() - if m.stopped { - return - } + m.serializer.ScheduleOr(func(context.Context) { + defer onDone() + if m.logger.V(2) { + m.logger.Infof("Received update for Listener resource %q: %+v", m.ldsResourceName, update) + } - if m.logger.V(2) { - m.logger.Infof("Received update for Listener resource %q: %+v", m.ldsResourceName, update) - } + m.currentListenerUpdate = update + + if update.InlineRouteConfig != nil { + // If there was a previous route config watcher because of a non-inline + // route configuration, cancel it. + m.rdsResourceName = "" + if m.routeConfigWatcher != nil { + m.routeConfigWatcher.stop() + m.routeConfigWatcher = nil + } + m.applyRouteConfigUpdateLocked(update.InlineRouteConfig) + return + } - m.currentListenerUpdate = update + // We get here only if there was no inline route configuration. If the route + // config name has not changed, send an update with existing route + // configuration and the newly received listener configuration. + if m.rdsResourceName == update.RouteConfigName { + m.maybeSendUpdateLocked() + return + } - if update.InlineRouteConfig != nil { - // If there was a previous route config watcher because of a non-inline - // route configuration, cancel it. - m.rdsResourceName = "" + // If the route config name has changed, cancel the old watcher and start a + // new one. At this point, since the new route config name has not yet been + // resolved, no update is sent to the channel, and therefore the old route + // configuration (if received) is used until the new one is received. + m.rdsResourceName = update.RouteConfigName if m.routeConfigWatcher != nil { m.routeConfigWatcher.stop() - m.routeConfigWatcher = nil + m.currentVirtualHost = nil } - m.applyRouteConfigUpdateLocked(update.InlineRouteConfig) - return - } - - // We get here only if there was no inline route configuration. If the route - // config name has not changed, send an update with existing route - // configuration and the newly received listener configuration. - if m.rdsResourceName == update.RouteConfigName { - m.maybeSendUpdateLocked() - return - } - - // If the route config name has changed, cancel the old watcher and start a - // new one. At this point, since the new route config name has not yet been - // resolved, no update is sent to the channel, and therefore the old route - // configuration (if received) is used until the new one is received. - m.rdsResourceName = update.RouteConfigName - if m.routeConfigWatcher != nil { - m.routeConfigWatcher.stop() - m.currentVirtualHost = nil - } - m.routeConfigWatcher = newRouteConfigWatcher(m.rdsResourceName, m) + m.routeConfigWatcher = newRouteConfigWatcher(m.rdsResourceName, m) + }, onDone) } func (m *DependencyManager) onListenerResourceError(err error, onDone func()) { - m.mu.Lock() - defer m.mu.Unlock() + m.serializer.ScheduleOr(func(context.Context) { + defer onDone() + m.logger.Warningf("Received resource error for Listener resource %q: %v", m.ldsResourceName, m.annotateErrorWithNodeID(err)) - defer onDone() - if m.stopped { - return - } - - m.logger.Warningf("Received resource error for Listener resource %q: %v", m.ldsResourceName, m.annotateErrorWithNodeID(err)) - - if m.routeConfigWatcher != nil { - m.routeConfigWatcher.stop() - } - m.rdsResourceName = "" - m.currentVirtualHost = nil - m.routeConfigWatcher = nil - m.watcher.Error(fmt.Errorf("listener resource error: %v", m.annotateErrorWithNodeID(err))) + if m.routeConfigWatcher != nil { + m.routeConfigWatcher.stop() + } + m.rdsResourceName = "" + m.currentVirtualHost = nil + m.routeConfigWatcher = nil + m.watcher.Error(fmt.Errorf("listener resource error: %v", m.annotateErrorWithNodeID(err))) + }, onDone) } // onListenerResourceAmbientError handles ambient errors received from the @@ -224,42 +213,34 @@ func (m *DependencyManager) onListenerResourceError(err error, onDone func()) { // state of the resource, no change is made to the current configuration and the // errors are only logged for visibility. func (m *DependencyManager) onListenerResourceAmbientError(err error, onDone func()) { - m.mu.Lock() - defer m.mu.Unlock() - - defer onDone() - if m.stopped { - return - } - - m.logger.Warningf("Listener resource ambient error: %v", m.annotateErrorWithNodeID(err)) + m.serializer.ScheduleOr(func(context.Context) { + defer onDone() + m.logger.Warningf("Listener resource ambient error: %v", m.annotateErrorWithNodeID(err)) + }, onDone) } func (m *DependencyManager) onRouteConfigResourceUpdate(resourceName string, update *xdsresource.RouteConfigUpdate, onDone func()) { - m.mu.Lock() - defer m.mu.Unlock() - - defer onDone() - if m.stopped || m.rdsResourceName != resourceName { - return - } - - if m.logger.V(2) { - m.logger.Infof("Received update for RouteConfiguration resource %q: %+v", resourceName, update) - } - m.applyRouteConfigUpdateLocked(update) + m.serializer.ScheduleOr(func(context.Context) { + defer onDone() + if m.rdsResourceName != resourceName { + return + } + if m.logger.V(2) { + m.logger.Infof("Received update for RouteConfiguration resource %q: %+v", resourceName, update) + } + m.applyRouteConfigUpdateLocked(update) + }, onDone) } func (m *DependencyManager) onRouteConfigResourceError(resourceName string, err error, onDone func()) { - m.mu.Lock() - defer m.mu.Unlock() - - defer onDone() - if m.stopped || m.rdsResourceName != resourceName { - return - } - m.logger.Warningf("Received resource error for RouteConfiguration resource %q: %v", resourceName, m.annotateErrorWithNodeID(err)) - m.watcher.Error(fmt.Errorf("route resource error: %v", m.annotateErrorWithNodeID(err))) + m.serializer.ScheduleOr(func(context.Context) { + defer onDone() + if m.rdsResourceName != resourceName { + return + } + m.logger.Warningf("Received resource error for RouteConfiguration resource %q: %v", resourceName, m.annotateErrorWithNodeID(err)) + m.watcher.Error(fmt.Errorf("route resource error: %v", m.annotateErrorWithNodeID(err))) + }, onDone) } // onRouteResourceAmbientError handles ambient errors received from the route @@ -267,13 +248,11 @@ func (m *DependencyManager) onRouteConfigResourceError(resourceName string, err // resource, no change is made to the current configuration and the errors are // only logged for visibility. func (m *DependencyManager) onRouteConfigResourceAmbientError(resourceName string, err error, onDone func()) { - m.mu.Lock() - defer m.mu.Unlock() - - defer onDone() - if m.stopped || m.rdsResourceName != resourceName { - return - } - - m.logger.Warningf("Route resource ambient error %q: %v", resourceName, m.annotateErrorWithNodeID(err)) + m.serializer.ScheduleOr(func(context.Context) { + defer onDone() + if m.rdsResourceName != resourceName { + return + } + m.logger.Warningf("Route resource ambient error %q: %v", resourceName, m.annotateErrorWithNodeID(err)) + }, onDone) } From 84a97338ac3f6a4d715d0afbde4696f2d150a3bb Mon Sep 17 00:00:00 2001 From: eshitachandwani Date: Mon, 17 Nov 2025 15:27:05 +0530 Subject: [PATCH 2/9] callbackserializer --- .../clients/xdsclient/clientimpl_watchers.go | 9 +- .../xds/xdsdepmgr/xds_dependency_manager.go | 187 ++++++++++-------- 2 files changed, 111 insertions(+), 85 deletions(-) diff --git a/internal/xds/clients/xdsclient/clientimpl_watchers.go b/internal/xds/clients/xdsclient/clientimpl_watchers.go index 68b29295ce28..244464d4b886 100644 --- a/internal/xds/clients/xdsclient/clientimpl_watchers.go +++ b/internal/xds/clients/xdsclient/clientimpl_watchers.go @@ -19,6 +19,7 @@ package xdsclient import ( + "context" "fmt" "google.golang.org/grpc/internal/xds/clients/xdsclient/internal/xdsresource" @@ -62,7 +63,9 @@ func (c *XDSClient) WatchResource(typeURL, resourceName string, watcher Resource rType, ok := c.config.ResourceTypes[typeURL] if !ok { logger.Warningf("ResourceType implementation for resource type url %v is not found", rType.TypeURL) - watcher.ResourceError(fmt.Errorf("ResourceType implementation for resource type url %v is not found", rType.TypeURL), func() {}) + c.serializer.TrySchedule(func(context.Context) { + watcher.ResourceError(fmt.Errorf("ResourceType implementation for resource type url %v is not found", rType.TypeURL), func() {}) + }) return func() {} } @@ -70,7 +73,9 @@ func (c *XDSClient) WatchResource(typeURL, resourceName string, watcher Resource a := c.getAuthorityForResource(n) if a == nil { logger.Warningf("Watch registered for name %q of type %q, authority %q is not found", rType.TypeName, resourceName, n.Authority) - watcher.ResourceError(fmt.Errorf("authority %q not found in bootstrap config for resource %q", n.Authority, resourceName), func() {}) + c.serializer.TrySchedule(func(context.Context) { + watcher.ResourceError(fmt.Errorf("authority %q not found in bootstrap config for resource %q", n.Authority, resourceName), func() {}) + }) return func() {} } // The watchResource method on the authority is invoked with n.String() diff --git a/internal/xds/xdsdepmgr/xds_dependency_manager.go b/internal/xds/xdsdepmgr/xds_dependency_manager.go index e06d0b1ea185..fae5ca962d42 100644 --- a/internal/xds/xdsdepmgr/xds_dependency_manager.go +++ b/internal/xds/xdsdepmgr/xds_dependency_manager.go @@ -19,12 +19,11 @@ package xdsdepmgr import ( - "context" "fmt" + "sync" "google.golang.org/grpc/grpclog" internalgrpclog "google.golang.org/grpc/internal/grpclog" - "google.golang.org/grpc/internal/grpcsync" "google.golang.org/grpc/internal/xds/xdsclient" "google.golang.org/grpc/internal/xds/xdsclient/xdsresource" ) @@ -70,9 +69,9 @@ type DependencyManager struct { dataplaneAuthority string nodeID string - // All the fields below are accessed only from the callback serializer. - serializer *grpcsync.CallbackSerializer - serializerCancel context.CancelFunc + // All the fields below are protected by mu. + mu sync.Mutex + stopped bool listenerWatcher *listenerWatcher currentListenerUpdate *xdsresource.ListenerUpdate @@ -100,9 +99,6 @@ func New(listenerName, dataplaneAuthority string, xdsClient xdsclient.XDSClient, nodeID: xdsClient.BootstrapConfig().Node().GetId(), } dm.logger = prefixLogger(dm) - ctx, cancel := context.WithCancel(context.Background()) - dm.serializer = grpcsync.NewCallbackSerializer(ctx) - dm.serializerCancel = cancel // Start the listener watch. Listener watch will start the other resource // watches as needed. @@ -112,9 +108,14 @@ func New(listenerName, dataplaneAuthority string, xdsClient xdsclient.XDSClient, // Close cancels all registered resource watches. func (m *DependencyManager) Close() { - m.serializerCancel() - <-m.serializer.Done() + m.mu.Lock() + defer m.mu.Unlock() + if m.stopped { + return + } + + m.stopped = true if m.listenerWatcher != nil { m.listenerWatcher.stop() } @@ -152,60 +153,70 @@ func (m *DependencyManager) applyRouteConfigUpdateLocked(update *xdsresource.Rou } func (m *DependencyManager) onListenerResourceUpdate(update *xdsresource.ListenerUpdate, onDone func()) { - m.serializer.ScheduleOr(func(context.Context) { - defer onDone() - if m.logger.V(2) { - m.logger.Infof("Received update for Listener resource %q: %+v", m.ldsResourceName, update) - } + m.mu.Lock() + defer m.mu.Unlock() - m.currentListenerUpdate = update - - if update.InlineRouteConfig != nil { - // If there was a previous route config watcher because of a non-inline - // route configuration, cancel it. - m.rdsResourceName = "" - if m.routeConfigWatcher != nil { - m.routeConfigWatcher.stop() - m.routeConfigWatcher = nil - } - m.applyRouteConfigUpdateLocked(update.InlineRouteConfig) - return - } + defer onDone() + if m.stopped { + return + } - // We get here only if there was no inline route configuration. If the route - // config name has not changed, send an update with existing route - // configuration and the newly received listener configuration. - if m.rdsResourceName == update.RouteConfigName { - m.maybeSendUpdateLocked() - return - } + if m.logger.V(2) { + m.logger.Infof("Received update for Listener resource %q: %+v", m.ldsResourceName, update) + } - // If the route config name has changed, cancel the old watcher and start a - // new one. At this point, since the new route config name has not yet been - // resolved, no update is sent to the channel, and therefore the old route - // configuration (if received) is used until the new one is received. - m.rdsResourceName = update.RouteConfigName + m.currentListenerUpdate = update + + if update.InlineRouteConfig != nil { + // If there was a previous route config watcher because of a non-inline + // route configuration, cancel it. + m.rdsResourceName = "" if m.routeConfigWatcher != nil { m.routeConfigWatcher.stop() - m.currentVirtualHost = nil + m.routeConfigWatcher = nil } - m.routeConfigWatcher = newRouteConfigWatcher(m.rdsResourceName, m) - }, onDone) + m.applyRouteConfigUpdateLocked(update.InlineRouteConfig) + return + } + + // We get here only if there was no inline route configuration. If the route + // config name has not changed, send an update with existing route + // configuration and the newly received listener configuration. + if m.rdsResourceName == update.RouteConfigName { + m.maybeSendUpdateLocked() + return + } + + // If the route config name has changed, cancel the old watcher and start a + // new one. At this point, since the new route config name has not yet been + // resolved, no update is sent to the channel, and therefore the old route + // configuration (if received) is used until the new one is received. + m.rdsResourceName = update.RouteConfigName + if m.routeConfigWatcher != nil { + m.routeConfigWatcher.stop() + m.currentVirtualHost = nil + } + m.routeConfigWatcher = newRouteConfigWatcher(m.rdsResourceName, m) } func (m *DependencyManager) onListenerResourceError(err error, onDone func()) { - m.serializer.ScheduleOr(func(context.Context) { - defer onDone() - m.logger.Warningf("Received resource error for Listener resource %q: %v", m.ldsResourceName, m.annotateErrorWithNodeID(err)) + m.mu.Lock() + defer m.mu.Unlock() - if m.routeConfigWatcher != nil { - m.routeConfigWatcher.stop() - } - m.rdsResourceName = "" - m.currentVirtualHost = nil - m.routeConfigWatcher = nil - m.watcher.Error(fmt.Errorf("listener resource error: %v", m.annotateErrorWithNodeID(err))) - }, onDone) + defer onDone() + if m.stopped { + return + } + + m.logger.Warningf("Received resource error for Listener resource %q: %v", m.ldsResourceName, m.annotateErrorWithNodeID(err)) + + if m.routeConfigWatcher != nil { + m.routeConfigWatcher.stop() + } + m.rdsResourceName = "" + m.currentVirtualHost = nil + m.routeConfigWatcher = nil + m.watcher.Error(fmt.Errorf("listener resource error: %v", m.annotateErrorWithNodeID(err))) } // onListenerResourceAmbientError handles ambient errors received from the @@ -213,34 +224,42 @@ func (m *DependencyManager) onListenerResourceError(err error, onDone func()) { // state of the resource, no change is made to the current configuration and the // errors are only logged for visibility. func (m *DependencyManager) onListenerResourceAmbientError(err error, onDone func()) { - m.serializer.ScheduleOr(func(context.Context) { - defer onDone() - m.logger.Warningf("Listener resource ambient error: %v", m.annotateErrorWithNodeID(err)) - }, onDone) + m.mu.Lock() + defer m.mu.Unlock() + + defer onDone() + if m.stopped { + return + } + + m.logger.Warningf("Listener resource ambient error: %v", m.annotateErrorWithNodeID(err)) } func (m *DependencyManager) onRouteConfigResourceUpdate(resourceName string, update *xdsresource.RouteConfigUpdate, onDone func()) { - m.serializer.ScheduleOr(func(context.Context) { - defer onDone() - if m.rdsResourceName != resourceName { - return - } - if m.logger.V(2) { - m.logger.Infof("Received update for RouteConfiguration resource %q: %+v", resourceName, update) - } - m.applyRouteConfigUpdateLocked(update) - }, onDone) + m.mu.Lock() + defer m.mu.Unlock() + + defer onDone() + if m.stopped || m.rdsResourceName != resourceName { + return + } + + if m.logger.V(2) { + m.logger.Infof("Received update for RouteConfiguration resource %q: %+v", resourceName, update) + } + m.applyRouteConfigUpdateLocked(update) } func (m *DependencyManager) onRouteConfigResourceError(resourceName string, err error, onDone func()) { - m.serializer.ScheduleOr(func(context.Context) { - defer onDone() - if m.rdsResourceName != resourceName { - return - } - m.logger.Warningf("Received resource error for RouteConfiguration resource %q: %v", resourceName, m.annotateErrorWithNodeID(err)) - m.watcher.Error(fmt.Errorf("route resource error: %v", m.annotateErrorWithNodeID(err))) - }, onDone) + m.mu.Lock() + defer m.mu.Unlock() + + defer onDone() + if m.stopped || m.rdsResourceName != resourceName { + return + } + m.logger.Warningf("Received resource error for RouteConfiguration resource %q: %v", resourceName, m.annotateErrorWithNodeID(err)) + m.watcher.Error(fmt.Errorf("route resource error: %v", m.annotateErrorWithNodeID(err))) } // onRouteResourceAmbientError handles ambient errors received from the route @@ -248,11 +267,13 @@ func (m *DependencyManager) onRouteConfigResourceError(resourceName string, err // resource, no change is made to the current configuration and the errors are // only logged for visibility. func (m *DependencyManager) onRouteConfigResourceAmbientError(resourceName string, err error, onDone func()) { - m.serializer.ScheduleOr(func(context.Context) { - defer onDone() - if m.rdsResourceName != resourceName { - return - } - m.logger.Warningf("Route resource ambient error %q: %v", resourceName, m.annotateErrorWithNodeID(err)) - }, onDone) + m.mu.Lock() + defer m.mu.Unlock() + + defer onDone() + if m.stopped || m.rdsResourceName != resourceName { + return + } + + m.logger.Warningf("Route resource ambient error %q: %v", resourceName, m.annotateErrorWithNodeID(err)) } From a27125fb993f788477c19dd39f50d1ba4b41d8d6 Mon Sep 17 00:00:00 2001 From: eshitachandwani Date: Thu, 20 Nov 2025 12:22:47 +0530 Subject: [PATCH 3/9] changes --- internal/xds/resolver/xds_resolver.go | 7 +++ internal/xds/resolver/xds_resolver_test.go | 61 ++++++++++++++++++++++ test/xds/xds_client_federation_test.go | 2 +- 3 files changed, 69 insertions(+), 1 deletion(-) diff --git a/internal/xds/resolver/xds_resolver.go b/internal/xds/resolver/xds_resolver.go index 7df74dd4c02d..1ce7e34384ab 100644 --- a/internal/xds/resolver/xds_resolver.go +++ b/internal/xds/resolver/xds_resolver.go @@ -219,6 +219,13 @@ type xdsResolver struct { channelID uint64 dm *xdsdepmgr.DependencyManager + // All methods on the xdsResolver type except for the ones invoked by gRPC, + // i.e ResolveNow() and Close(), are guaranteed to execute in the context of + // this serializer's callback. And since the serializer guarantees mutual + // exclusion among these callbacks, we can get by without any mutexes to + // access all of the below defined state. The only exception is Close(), + // which does access some of this shared state, but it does so after + // cancelling the context passed to the serializer. serializer *grpcsync.CallbackSerializer serializerCancel context.CancelFunc diff --git a/internal/xds/resolver/xds_resolver_test.go b/internal/xds/resolver/xds_resolver_test.go index 4818c28181c4..bc0b80ff0650 100644 --- a/internal/xds/resolver/xds_resolver_test.go +++ b/internal/xds/resolver/xds_resolver_test.go @@ -359,6 +359,67 @@ func (s) TestResolverBadServiceUpdate_NACKedWithoutCache(t *testing.T) { } } +// Tests the case where a resource, present in cache, returned by the +// management server is NACKed by the xDS client, which then returns +// an update containing an ambient error to the resolver. Verifies that the +// update is propagated to the ClientConn by the resolver. It tests the +// case where the resolver gets a good update first, and an error +// after the good update. The test also verifies that these are propagated to +// the ClientConn and that RPC succeeds as expected after receiving good update +// as well as ambient error. +func (s) TestResolverBadServiceUpdate_NACKedWithCache(t *testing.T) { + // Spin up an xDS management server for the test. + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + nodeID := uuid.New().String() + mgmtServer, _, _, bc := setupManagementServerForTest(t, nodeID) + + stateCh, _, _ := buildResolverForTarget(t, resolver.Target{URL: *testutils.MustParseURL("xds:///" + defaultTestServiceName)}, bc) + + // Configure good listener and route configuration resources on the + // management server. + listeners := []*v3listenerpb.Listener{e2e.DefaultClientListener(defaultTestServiceName, defaultTestRouteConfigName)} + routes := []*v3routepb.RouteConfiguration{e2e.DefaultRouteConfig(defaultTestRouteConfigName, defaultTestServiceName, defaultTestClusterName)} + configureResourcesOnManagementServer(ctx, t, mgmtServer, nodeID, listeners, routes) + + // Expect a good update from the resolver. + cs := verifyUpdateFromResolver(ctx, t, stateCh, wantServiceConfig(defaultTestClusterName)) + + // "Make an RPC" by invoking the config selector. + _, err := cs.SelectConfig(iresolver.RPCInfo{Context: ctx, Method: "/service/method"}) + if err != nil { + t.Fatalf("cs.SelectConfig(): %v", err) + } + + // Configure a listener resource that is expected to be NACKed because it + // does not contain the `RouteSpecifier` field in the HTTPConnectionManager. + hcm := testutils.MarshalAny(t, &v3httppb.HttpConnectionManager{ + HttpFilters: []*v3httppb.HttpFilter{e2e.HTTPFilter("router", &v3routerpb.Router{})}, + }) + lis := &v3listenerpb.Listener{ + Name: defaultTestServiceName, + ApiListener: &v3listenerpb.ApiListener{ApiListener: hcm}, + FilterChains: []*v3listenerpb.FilterChain{{ + Name: "filter-chain-name", + Filters: []*v3listenerpb.Filter{{ + Name: wellknown.HTTPConnectionManager, + ConfigType: &v3listenerpb.Filter_TypedConfig{TypedConfig: hcm}, + }}, + }}, + } + + // Expect an error update from the resolver. Since the resource is cached, + // it should be received as an ambient error. + configureResourcesOnManagementServer(ctx, t, mgmtServer, nodeID, []*v3listenerpb.Listener{lis}, nil) + + // "Make an RPC" by invoking the config selector which should succeed by + // continuing to use the previously cached resource. + _, err = cs.SelectConfig(iresolver.RPCInfo{Context: ctx, Method: "/service/method"}) + if err != nil { + t.Fatalf("cs.SelectConfig(): %v", err) + } +} + // TestResolverGoodServiceUpdate tests the case where the resource returned by // the management server is ACKed by the xDS client, which then returns a good // service update to the resolver. The test verifies that the service config diff --git a/test/xds/xds_client_federation_test.go b/test/xds/xds_client_federation_test.go index df4a7a1383bb..27c33ccfb074 100644 --- a/test/xds/xds_client_federation_test.go +++ b/test/xds/xds_client_federation_test.go @@ -308,7 +308,7 @@ func (s) TestFederation_UnknownAuthorityInDialTarget(t *testing.T) { // LDS resource associated with the dial target contains an RDS resource name // with an authority which is not specified in the bootstrap configuration. The // test verifies that RPCs fail with an appropriate error. -func (s) TestFederation_UnknownAuthorityInReceivedResponse(t *testing.T) { +func TestFederation_UnknownAuthorityInReceivedResponse(t *testing.T) { mgmtServer, nodeID, _, xdsResolver := setup.ManagementServerAndResolver(t) // LDS is old style name. From 3f87474474fa306d9c767950c7f95944a08424c2 Mon Sep 17 00:00:00 2001 From: eshitachandwani Date: Fri, 21 Nov 2025 20:02:47 +0530 Subject: [PATCH 4/9] add s to the test --- test/xds/xds_client_federation_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/xds/xds_client_federation_test.go b/test/xds/xds_client_federation_test.go index 27c33ccfb074..df4a7a1383bb 100644 --- a/test/xds/xds_client_federation_test.go +++ b/test/xds/xds_client_federation_test.go @@ -308,7 +308,7 @@ func (s) TestFederation_UnknownAuthorityInDialTarget(t *testing.T) { // LDS resource associated with the dial target contains an RDS resource name // with an authority which is not specified in the bootstrap configuration. The // test verifies that RPCs fail with an appropriate error. -func TestFederation_UnknownAuthorityInReceivedResponse(t *testing.T) { +func (s) TestFederation_UnknownAuthorityInReceivedResponse(t *testing.T) { mgmtServer, nodeID, _, xdsResolver := setup.ManagementServerAndResolver(t) // LDS is old style name. From 9854e77a874f41ba866e2c9e5a9d9ed46f8e7682 Mon Sep 17 00:00:00 2001 From: eshitachandwani Date: Tue, 25 Nov 2025 16:44:31 +0530 Subject: [PATCH 5/9] minor cleanups --- .../clients/xdsclient/clientimpl_watchers.go | 10 +- .../xds/clients/xdsclient/resource_watcher.go | 3 + internal/xds/resolver/xds_resolver.go | 110 +++++++++--------- 3 files changed, 61 insertions(+), 62 deletions(-) diff --git a/internal/xds/clients/xdsclient/clientimpl_watchers.go b/internal/xds/clients/xdsclient/clientimpl_watchers.go index 244464d4b886..14ef8dda5b84 100644 --- a/internal/xds/clients/xdsclient/clientimpl_watchers.go +++ b/internal/xds/clients/xdsclient/clientimpl_watchers.go @@ -42,9 +42,11 @@ func (w *wrappingWatcher) ResourceError(err error, done func()) { // WatchResource starts watching the specified resource. // -// typeURL specifies the resource type implementation to use. The watch fails -// if there is no resource type implementation for the given typeURL. See the -// ResourceTypes field in the Config struct used to create the XDSClient. +// typeURL specifies the resource type implementation to use. if there is no +// resource type implementation for the given typeURL, or if authority is not +// found for the resource name, no watch is started and a no-op cancel function +// is returned. See the ResourceTypes field in the Config struct used to create +// the XDSClient. // // The returned function cancels the watch and prevents future calls to the // watcher. @@ -74,7 +76,7 @@ func (c *XDSClient) WatchResource(typeURL, resourceName string, watcher Resource if a == nil { logger.Warningf("Watch registered for name %q of type %q, authority %q is not found", rType.TypeName, resourceName, n.Authority) c.serializer.TrySchedule(func(context.Context) { - watcher.ResourceError(fmt.Errorf("authority %q not found in bootstrap config for resource %q", n.Authority, resourceName), func() {}) + watcher.ResourceError(fmt.Errorf("authority %q not found in the config for resource %q", n.Authority, resourceName), func() {}) }) return func() {} } diff --git a/internal/xds/clients/xdsclient/resource_watcher.go b/internal/xds/clients/xdsclient/resource_watcher.go index 37d01bc71e76..e167be1078cc 100644 --- a/internal/xds/clients/xdsclient/resource_watcher.go +++ b/internal/xds/clients/xdsclient/resource_watcher.go @@ -21,6 +21,9 @@ package xdsclient // ResourceWatcher is notified of the resource updates and errors that are // received by the xDS client from the management server. // +// All methods on this interface are guaranteed to be called serially by the xDS +// client. Only one method will be executing for a specific watcher at a time. +// // All methods contain a done parameter which should be called when processing // of the update has completed. For example, if processing a resource requires // watching new resources, those watches should be completed before done is diff --git a/internal/xds/resolver/xds_resolver.go b/internal/xds/resolver/xds_resolver.go index 1ce7e34384ab..c3c36e4d4692 100644 --- a/internal/xds/resolver/xds_resolver.go +++ b/internal/xds/resolver/xds_resolver.go @@ -29,7 +29,6 @@ import ( "google.golang.org/grpc/internal" "google.golang.org/grpc/internal/grpclog" "google.golang.org/grpc/internal/grpcsync" - "google.golang.org/grpc/internal/pretty" iresolver "google.golang.org/grpc/internal/resolver" "google.golang.org/grpc/internal/wrr" "google.golang.org/grpc/internal/xds/bootstrap" @@ -112,30 +111,6 @@ type xdsResolverBuilder struct { // The xds bootstrap process is performed (and a new xDS client is built) every // time an xds resolver is built. func (b *xdsResolverBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (_ resolver.Resolver, retErr error) { - r := &xdsResolver{ - cc: cc, - activeClusters: make(map[string]*clusterInfo), - channelID: rand.Uint64(), - } - defer func() { - if retErr != nil { - r.Close() - } - }() - r.logger = prefixLogger(r) - r.logger.Infof("Creating resolver for target: %+v", target) - - // Initialize the serializer used to synchronize the following: - // - updates from the xDS client. This could lead to generation of new - // service config if resolution is complete. - // - completion of an RPC to a removed cluster causing the associated ref - // count to become zero, resulting in generation of new service config. - // - stopping of a config selector that results in generation of new service - // config. - ctx, cancel := context.WithCancel(context.Background()) - r.serializer = grpcsync.NewCallbackSerializer(ctx) - r.serializerCancel = cancel - // Initialize the xDS client. newXDSClient := rinternal.NewXDSClient.(func(string, estats.MetricsRecorder) (xdsclient.XDSClient, func(), error)) if b.newXDSClient != nil { @@ -145,15 +120,34 @@ func (b *xdsResolverBuilder) Build(target resolver.Target, cc resolver.ClientCon if err != nil { return nil, fmt.Errorf("xds: failed to create xds-client: %v", err) } - r.xdsClient = client - r.xdsClientClose = closeFn // Determine the listener resource name and start a watcher for it. - template, err := r.sanityChecksOnBootstrapConfig(target, opts, r.xdsClient) + template, err := sanityChecksOnBootstrapConfig(target, opts, client) if err != nil { return nil, err } - r.ldsResourceName = bootstrap.PopulateResourceTemplate(template, target.Endpoint()) + + r := &xdsResolver{ + cc: cc, + xdsClient: client, + xdsClientClose: closeFn, + activeClusters: make(map[string]*clusterInfo), + channelID: rand.Uint64(), + ldsResourceName: bootstrap.PopulateResourceTemplate(template, target.Endpoint()), + } + r.logger = prefixLogger(r) + r.logger.Infof("Creating resolver for target: %+v", target) + + // Initialize the serializer used to synchronize the following: - updates + // from the dependency manager. This could lead to generation of new + // service config if resolution is complete. + // - completion of an RPC to a removed cluster causing the associated ref + // count to become zero, resulting in generation of new service config. + // - stopping of a config selector that results in generation of new service + // config. + ctx, cancel := context.WithCancel(context.Background()) + r.serializer = grpcsync.NewCallbackSerializer(ctx) + r.serializerCancel = cancel r.dm = xdsdepmgr.New(r.ldsResourceName, opts.Authority, r.xdsClient, r) return r, nil } @@ -167,7 +161,7 @@ func (b *xdsResolverBuilder) Build(target resolver.Target, cc resolver.ClientCon // // Returns the listener resource name template to use. If any of the above // validations fail, a non-nil error is returned. -func (r *xdsResolver) sanityChecksOnBootstrapConfig(target resolver.Target, _ resolver.BuildOptions, client xdsclient.XDSClient) (string, error) { +func sanityChecksOnBootstrapConfig(target resolver.Target, _ resolver.BuildOptions, client xdsclient.XDSClient) (string, error) { bootstrapConfig := client.BootstrapConfig() if bootstrapConfig == nil { // This is never expected to happen after a successful xDS client @@ -205,37 +199,35 @@ func (*xdsResolverBuilder) Scheme() string { // xdsResolver implements the resolver.Resolver interface. // -// It registers a watcher for ServiceConfig updates with the xdsClient object -// (which performs LDS/RDS queries for the same), and passes the received -// updates to the ClientConn. +// It manages the dependency manager which in turn manages all xDS resource +// watches. It receives the xDS resource config and passes them to ClientConn. type xdsResolver struct { - cc resolver.ClientConn - logger *grpclog.PrefixLogger + // The following fields are initialized at creation time and are read-only + // after that. + cc resolver.ClientConn + logger *grpclog.PrefixLogger + ldsResourceName string + dm *xdsdepmgr.DependencyManager // The underlying xdsClient which performs all xDS requests and responses. xdsClient xdsclient.XDSClient xdsClientClose func() // A random number which uniquely identifies the channel which owns this // resolver. channelID uint64 - dm *xdsdepmgr.DependencyManager - // All methods on the xdsResolver type except for the ones invoked by gRPC, // i.e ResolveNow() and Close(), are guaranteed to execute in the context of - // this serializer's callback. And since the serializer guarantees mutual - // exclusion among these callbacks, we can get by without any mutexes to - // access all of the below defined state. The only exception is Close(), - // which does access some of this shared state, but it does so after - // cancelling the context passed to the serializer. + // this serializer's callback. We use the serializer because these shared + // states are accessed by each RPC when it is committed,, and so + // serializeris preffered over a mutex. serializer *grpcsync.CallbackSerializer serializerCancel context.CancelFunc - ldsResourceName string - rdsResourceName string - + // The following fields are accessed only from within the serializer + // callbacks. + xdsConfig *xdsresource.XDSConfig // activeClusters is a map from cluster name to information about the // cluster that includes a ref count and load balancing configuration. - activeClusters map[string]*clusterInfo - + activeClusters map[string]*clusterInfo curConfigSelector stoppableConfigSelector } @@ -269,8 +261,8 @@ func (r *xdsResolver) Close() { // - updates the current config selector used by the resolver. func (r *xdsResolver) Update(config *xdsresource.XDSConfig) { r.serializer.TrySchedule(func(context.Context) { - r.rdsResourceName = config.Listener.RouteConfigName - cs, err := r.newConfigSelector(config) + r.xdsConfig = config + cs, err := r.newConfigSelector() if err != nil { r.onResourceError(err) return @@ -326,7 +318,9 @@ func (r *xdsResolver) sendNewServiceConfig(cs stoppableConfigSelector) bool { } sc := serviceConfigJSON(r.activeClusters) - r.logger.Infof("For Listener resource %q and RouteConfiguration resource %q, generated service config: %v", r.ldsResourceName, r.rdsResourceName, pretty.FormatJSON(sc)) + if r.logger.V(2) { + r.logger.Infof("For Listener resource %q and RouteConfiguration resource %q, generated service config: %+v", r.ldsResourceName, r.xdsConfig.Listener.RouteConfigName, sc) + } // Send the update to the ClientConn. state := iresolver.SetConfigSelector(resolver.State{ @@ -346,7 +340,7 @@ func (r *xdsResolver) sendNewServiceConfig(cs stoppableConfigSelector) bool { // r.activeClusters for previously-unseen clusters. // // Only executed in the context of a serializer callback. -func (r *xdsResolver) newConfigSelector(config *xdsresource.XDSConfig) (*configSelector, error) { +func (r *xdsResolver) newConfigSelector() (*configSelector, error) { cs := &configSelector{ channelID: r.channelID, xdsNodeID: r.xdsClient.BootstrapConfig().Node().GetId(), @@ -356,25 +350,25 @@ func (r *xdsResolver) newConfigSelector(config *xdsresource.XDSConfig) (*configS }) }, virtualHost: virtualHost{ - retryConfig: config.VirtualHost.RetryConfig, + retryConfig: r.xdsConfig.VirtualHost.RetryConfig, }, - routes: make([]route, len(config.VirtualHost.Routes)), + routes: make([]route, len(r.xdsConfig.VirtualHost.Routes)), clusters: make(map[string]*clusterInfo), - httpFilterConfig: config.Listener.HTTPFilters, + httpFilterConfig: r.xdsConfig.Listener.HTTPFilters, } - for i, rt := range config.VirtualHost.Routes { + for i, rt := range r.xdsConfig.VirtualHost.Routes { clusters := rinternal.NewWRR.(func() wrr.WRR)() if rt.ClusterSpecifierPlugin != "" { clusterName := clusterSpecifierPluginPrefix + rt.ClusterSpecifierPlugin clusters.Add(&routeCluster{name: clusterName}, 1) ci := r.addOrGetActiveClusterInfo(clusterName) - ci.cfg = xdsChildConfig{ChildPolicy: balancerConfig(config.RouteConfig.ClusterSpecifierPlugins[rt.ClusterSpecifierPlugin])} + ci.cfg = xdsChildConfig{ChildPolicy: balancerConfig(r.xdsConfig.RouteConfig.ClusterSpecifierPlugins[rt.ClusterSpecifierPlugin])} cs.clusters[clusterName] = ci } else { for _, wc := range rt.WeightedClusters { clusterName := clusterPrefix + wc.Name - interceptor, err := newInterceptor(config.Listener.HTTPFilters, wc.HTTPFilterConfigOverride, rt.HTTPFilterConfigOverride, config.VirtualHost.HTTPFilterConfigOverride) + interceptor, err := newInterceptor(r.xdsConfig.Listener.HTTPFilters, wc.HTTPFilterConfigOverride, rt.HTTPFilterConfigOverride, r.xdsConfig.VirtualHost.HTTPFilterConfigOverride) if err != nil { return nil, err } @@ -392,7 +386,7 @@ func (r *xdsResolver) newConfigSelector(config *xdsresource.XDSConfig) (*configS cs.routes[i].m = xdsresource.RouteToMatcher(rt) cs.routes[i].actionType = rt.ActionType if rt.MaxStreamDuration == nil { - cs.routes[i].maxStreamDuration = config.Listener.MaxStreamDuration + cs.routes[i].maxStreamDuration = r.xdsConfig.Listener.MaxStreamDuration } else { cs.routes[i].maxStreamDuration = *rt.MaxStreamDuration } From 33385203ae0c2e6912c663cec7d66f2d0b60a2a3 Mon Sep 17 00:00:00 2001 From: eshitachandwani Date: Tue, 25 Nov 2025 17:16:24 +0530 Subject: [PATCH 6/9] comment --- internal/xds/resolver/xds_resolver.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/internal/xds/resolver/xds_resolver.go b/internal/xds/resolver/xds_resolver.go index c3c36e4d4692..f7b5651789d0 100644 --- a/internal/xds/resolver/xds_resolver.go +++ b/internal/xds/resolver/xds_resolver.go @@ -121,11 +121,11 @@ func (b *xdsResolverBuilder) Build(target resolver.Target, cc resolver.ClientCon return nil, fmt.Errorf("xds: failed to create xds-client: %v", err) } - // Determine the listener resource name and start a watcher for it. template, err := sanityChecksOnBootstrapConfig(target, opts, client) if err != nil { return nil, err } + ldsResourceName := bootstrap.PopulateResourceTemplate(template, target.Endpoint()) r := &xdsResolver{ cc: cc, @@ -133,14 +133,14 @@ func (b *xdsResolverBuilder) Build(target resolver.Target, cc resolver.ClientCon xdsClientClose: closeFn, activeClusters: make(map[string]*clusterInfo), channelID: rand.Uint64(), - ldsResourceName: bootstrap.PopulateResourceTemplate(template, target.Endpoint()), + ldsResourceName: ldsResourceName, } r.logger = prefixLogger(r) r.logger.Infof("Creating resolver for target: %+v", target) - // Initialize the serializer used to synchronize the following: - updates - // from the dependency manager. This could lead to generation of new - // service config if resolution is complete. + // Initialize the serializer used to synchronize the following: + // - updates from the dependency manager. This could lead to generation of + // new service config if resolution is complete. // - completion of an RPC to a removed cluster causing the associated ref // count to become zero, resulting in generation of new service config. // - stopping of a config selector that results in generation of new service From fdbd47e89d3f864ea4626a2995f03f881f6a6287 Mon Sep 17 00:00:00 2001 From: eshitachandwani Date: Tue, 25 Nov 2025 17:26:01 +0530 Subject: [PATCH 7/9] xdsCLientClose --- internal/xds/resolver/xds_resolver.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/internal/xds/resolver/xds_resolver.go b/internal/xds/resolver/xds_resolver.go index f7b5651789d0..c985cb42cf16 100644 --- a/internal/xds/resolver/xds_resolver.go +++ b/internal/xds/resolver/xds_resolver.go @@ -116,13 +116,14 @@ func (b *xdsResolverBuilder) Build(target resolver.Target, cc resolver.ClientCon if b.newXDSClient != nil { newXDSClient = b.newXDSClient } - client, closeFn, err := newXDSClient(target.String(), opts.MetricsRecorder) + client, xdsClientClose, err := newXDSClient(target.String(), opts.MetricsRecorder) if err != nil { return nil, fmt.Errorf("xds: failed to create xds-client: %v", err) } template, err := sanityChecksOnBootstrapConfig(target, opts, client) if err != nil { + xdsClientClose() return nil, err } ldsResourceName := bootstrap.PopulateResourceTemplate(template, target.Endpoint()) @@ -130,7 +131,7 @@ func (b *xdsResolverBuilder) Build(target resolver.Target, cc resolver.ClientCon r := &xdsResolver{ cc: cc, xdsClient: client, - xdsClientClose: closeFn, + xdsClientClose: xdsClientClose, activeClusters: make(map[string]*clusterInfo), channelID: rand.Uint64(), ldsResourceName: ldsResourceName, From c5f2c3535142a0789f5aa4655900c134bac1f54a Mon Sep 17 00:00:00 2001 From: eshitachandwani Date: Thu, 27 Nov 2025 11:30:46 +0530 Subject: [PATCH 8/9] minor changes --- internal/xds/resolver/xds_resolver.go | 36 +++++++++++++-------------- 1 file changed, 17 insertions(+), 19 deletions(-) diff --git a/internal/xds/resolver/xds_resolver.go b/internal/xds/resolver/xds_resolver.go index c985cb42cf16..7b3e9060967f 100644 --- a/internal/xds/resolver/xds_resolver.go +++ b/internal/xds/resolver/xds_resolver.go @@ -128,6 +128,7 @@ func (b *xdsResolverBuilder) Build(target resolver.Target, cc resolver.ClientCon } ldsResourceName := bootstrap.PopulateResourceTemplate(template, target.Endpoint()) + ctx, cancel := context.WithCancel(context.Background()) r := &xdsResolver{ cc: cc, xdsClient: client, @@ -135,20 +136,20 @@ func (b *xdsResolverBuilder) Build(target resolver.Target, cc resolver.ClientCon activeClusters: make(map[string]*clusterInfo), channelID: rand.Uint64(), ldsResourceName: ldsResourceName, + + // serializer used to synchronize the following: + // - updates from the dependency manager. This could lead to generation + // of new service config if resolution is complete. + // - completion of an RPC to a removed cluster causing the associated + // ref count to become zero, resulting in generation of new service + // config. + // - stopping of a config selector that results in generation of new + // service config. + serializer: grpcsync.NewCallbackSerializer(ctx), + serializerCancel: cancel, } r.logger = prefixLogger(r) r.logger.Infof("Creating resolver for target: %+v", target) - - // Initialize the serializer used to synchronize the following: - // - updates from the dependency manager. This could lead to generation of - // new service config if resolution is complete. - // - completion of an RPC to a removed cluster causing the associated ref - // count to become zero, resulting in generation of new service config. - // - stopping of a config selector that results in generation of new service - // config. - ctx, cancel := context.WithCancel(context.Background()) - r.serializer = grpcsync.NewCallbackSerializer(ctx) - r.serializerCancel = cancel r.dm = xdsdepmgr.New(r.ldsResourceName, opts.Authority, r.xdsClient, r) return r, nil } @@ -209,17 +210,14 @@ type xdsResolver struct { logger *grpclog.PrefixLogger ldsResourceName string dm *xdsdepmgr.DependencyManager - // The underlying xdsClient which performs all xDS requests and responses. - xdsClient xdsclient.XDSClient - xdsClientClose func() - // A random number which uniquely identifies the channel which owns this - // resolver. - channelID uint64 + xdsClient xdsclient.XDSClient + xdsClientClose func() + channelID uint64 // Unique random ID for the channel owning this resolver. // All methods on the xdsResolver type except for the ones invoked by gRPC, // i.e ResolveNow() and Close(), are guaranteed to execute in the context of // this serializer's callback. We use the serializer because these shared - // states are accessed by each RPC when it is committed,, and so - // serializeris preffered over a mutex. + // states are accessed by each RPC when it is committed, and so + // serializer is preffered over a mutex. serializer *grpcsync.CallbackSerializer serializerCancel context.CancelFunc From 55e666b739ac11ac02b71dce57989a958de7119c Mon Sep 17 00:00:00 2001 From: eshitachandwani Date: Thu, 27 Nov 2025 21:12:45 +0530 Subject: [PATCH 9/9] minor changes --- internal/xds/resolver/xds_resolver.go | 4 +- internal/xds/resolver/xds_resolver_test.go | 77 +++++++++++++++++++--- 2 files changed, 69 insertions(+), 12 deletions(-) diff --git a/internal/xds/resolver/xds_resolver.go b/internal/xds/resolver/xds_resolver.go index 7b3e9060967f..d877d42700ac 100644 --- a/internal/xds/resolver/xds_resolver.go +++ b/internal/xds/resolver/xds_resolver.go @@ -121,7 +121,7 @@ func (b *xdsResolverBuilder) Build(target resolver.Target, cc resolver.ClientCon return nil, fmt.Errorf("xds: failed to create xds-client: %v", err) } - template, err := sanityChecksOnBootstrapConfig(target, opts, client) + template, err := sanityChecksOnBootstrapConfig(target, client) if err != nil { xdsClientClose() return nil, err @@ -163,7 +163,7 @@ func (b *xdsResolverBuilder) Build(target resolver.Target, cc resolver.ClientCon // // Returns the listener resource name template to use. If any of the above // validations fail, a non-nil error is returned. -func sanityChecksOnBootstrapConfig(target resolver.Target, _ resolver.BuildOptions, client xdsclient.XDSClient) (string, error) { +func sanityChecksOnBootstrapConfig(target resolver.Target, client xdsclient.XDSClient) (string, error) { bootstrapConfig := client.BootstrapConfig() if bootstrapConfig == nil { // This is never expected to happen after a successful xDS client diff --git a/internal/xds/resolver/xds_resolver_test.go b/internal/xds/resolver/xds_resolver_test.go index bc0b80ff0650..70db68d3b1a8 100644 --- a/internal/xds/resolver/xds_resolver_test.go +++ b/internal/xds/resolver/xds_resolver_test.go @@ -286,6 +286,65 @@ func (s) TestResolverCloseClosesXDSClient(t *testing.T) { } } +// Tests the case where there is no virtual host in the route configuration +// matches the dataplane authority. Verifies that the resolver returns the +// correct error. +func (s) TestNoMatchingVirtualHost(t *testing.T) { + // Spin up an xDS management server for the test. + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + nodeID := uuid.New().String() + mgmtServer, _, _, bc := setupManagementServerForTest(t, nodeID) + + // Configure route resource with no virtual host so that it does not match the authority. + listener := e2e.DefaultClientListener(defaultTestServiceName, defaultTestRouteConfigName) + route := e2e.DefaultRouteConfig(defaultTestRouteConfigName, defaultTestServiceName, defaultTestClusterName) + route.VirtualHosts = nil + configureResourcesOnManagementServer(ctx, t, mgmtServer, nodeID, []*v3listenerpb.Listener{listener}, []*v3routepb.RouteConfiguration{route}) + + // Build the resolver inline (duplicating buildResolverForTarget internals) + // to avoid issues with blocked channel writes when NACKs occur. + target := resolver.Target{URL: *testutils.MustParseURL("xds:///" + defaultTestServiceName)} + + // Create an xDS resolver with the provided bootstrap configuration. + if internal.NewXDSResolverWithConfigForTesting == nil { + t.Fatalf("internal.NewXDSResolverWithConfigForTesting is nil") + } + + builder, err := internal.NewXDSResolverWithConfigForTesting.(func([]byte) (resolver.Builder, error))(bc) + if err != nil { + t.Fatalf("Failed to create xDS resolver for testing: %v", err) + } + + errCh := testutils.NewChannel() + tcc := &testutils.ResolverClientConn{Logger: t, ReportErrorF: func(err error) { errCh.Replace(err) }} + r, err := builder.Build(target, tcc, resolver.BuildOptions{ + Authority: url.PathEscape(target.Endpoint()), + }) + if err != nil { + t.Fatalf("Failed to build xDS resolver for target %q: %v", target, err) + } + defer r.Close() + + // Wait for and verify the error update from the resolver. + // Since the resource is not cached, it should be received as resource error. + select { + case <-ctx.Done(): + t.Fatalf("Timeout waiting for error to be propagated to the ClientConn") + case gotErr := <-errCh.C: + if gotErr == nil { + t.Fatalf("got nil error from resolver, want error containing 'could not find VirtualHost'") + } + errStr := fmt.Sprint(gotErr) + if !strings.Contains(errStr, fmt.Sprintf("could not find VirtualHost for %q", defaultTestServiceName)) { + t.Fatalf("got error from resolver %q, want error containing 'could not find VirtualHost for %q'", errStr, defaultTestServiceName) + } + if !strings.Contains(errStr, nodeID) { + t.Fatalf("got error from resolver %q, want nodeID %q", errStr, nodeID) + } + } +} + // Tests the case where a resource, not present in cache, returned by the // management server is NACKed by the xDS client, which then returns an update // containing a resource error to the resolver. It tests the case where the @@ -359,14 +418,12 @@ func (s) TestResolverBadServiceUpdate_NACKedWithoutCache(t *testing.T) { } } -// Tests the case where a resource, present in cache, returned by the -// management server is NACKed by the xDS client, which then returns -// an update containing an ambient error to the resolver. Verifies that the -// update is propagated to the ClientConn by the resolver. It tests the -// case where the resolver gets a good update first, and an error -// after the good update. The test also verifies that these are propagated to -// the ClientConn and that RPC succeeds as expected after receiving good update -// as well as ambient error. +// Tests the case where a resource, present in cache, returned by the management +// server is NACKed by the xDS client, which then returns an update containing +// an ambient error to the resolver. It tests the case where the resolver gets a +// good update first, and an error after the good update. The test verifies that +// the RPCs succeeds as expected after receiving good update as well as ambient +// error. func (s) TestResolverBadServiceUpdate_NACKedWithCache(t *testing.T) { // Spin up an xDS management server for the test. ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) @@ -408,8 +465,8 @@ func (s) TestResolverBadServiceUpdate_NACKedWithCache(t *testing.T) { }}, } - // Expect an error update from the resolver. Since the resource is cached, - // it should be received as an ambient error. + // Since the resource is cached, it should be received as an ambient error + // and so the RPCs should continue passing. configureResourcesOnManagementServer(ctx, t, mgmtServer, nodeID, []*v3listenerpb.Listener{lis}, nil) // "Make an RPC" by invoking the config selector which should succeed by