From 8e1cc943b0cad48398ad47d24a1ff92c9c850e45 Mon Sep 17 00:00:00 2001 From: yifeizhuang Date: Thu, 25 Jan 2024 17:05:12 -0800 Subject: [PATCH] xds: change controlPlaneClient and loadReportClient to use xdsTransportFactory (#10829) --- .../java/io/grpc/xds/ControlPlaneClient.java | 241 +++++++----------- .../io/grpc/xds/GrpcXdsTransportFactory.java | 17 +- .../java/io/grpc/xds/LoadReportClient.java | 109 ++++---- .../grpc/xds/SharedXdsClientPoolProvider.java | 4 +- .../main/java/io/grpc/xds/XdsClientImpl.java | 29 +-- .../io/grpc/xds/LoadReportClientTest.java | 4 +- .../io/grpc/xds/XdsClientImplTestBase.java | 18 +- 7 files changed, 173 insertions(+), 249 deletions(-) diff --git a/xds/src/main/java/io/grpc/xds/ControlPlaneClient.java b/xds/src/main/java/io/grpc/xds/ControlPlaneClient.java index 5a344336ba2..ec156a3e939 100644 --- a/xds/src/main/java/io/grpc/xds/ControlPlaneClient.java +++ b/xds/src/main/java/io/grpc/xds/ControlPlaneClient.java @@ -28,23 +28,22 @@ import io.envoyproxy.envoy.service.discovery.v3.AggregatedDiscoveryServiceGrpc; import io.envoyproxy.envoy.service.discovery.v3.DiscoveryRequest; import io.envoyproxy.envoy.service.discovery.v3.DiscoveryResponse; -import io.grpc.Channel; import io.grpc.Context; import io.grpc.InternalLogId; -import io.grpc.ManagedChannel; +import io.grpc.MethodDescriptor; import io.grpc.Status; import io.grpc.SynchronizationContext; import io.grpc.SynchronizationContext.ScheduledHandle; import io.grpc.internal.BackoffPolicy; -import io.grpc.stub.ClientCallStreamObserver; -import io.grpc.stub.ClientResponseObserver; import io.grpc.xds.Bootstrapper.ServerInfo; import io.grpc.xds.EnvoyProtoData.Node; import io.grpc.xds.XdsClient.ProcessingTracker; import io.grpc.xds.XdsClient.ResourceStore; import io.grpc.xds.XdsClient.XdsResponseHandler; -import io.grpc.xds.XdsClientImpl.XdsChannelFactory; import io.grpc.xds.XdsLogger.XdsLogLevel; +import io.grpc.xds.XdsTransportFactory.EventHandler; +import io.grpc.xds.XdsTransportFactory.StreamingCall; +import io.grpc.xds.XdsTransportFactory.XdsTransport; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -67,7 +66,7 @@ final class ControlPlaneClient { private final InternalLogId logId; private final XdsLogger logger; private final ServerInfo serverInfo; - private final ManagedChannel channel; + private final XdsTransport xdsTransport; private final XdsResponseHandler xdsResponseHandler; private final ResourceStore resourceStore; private final Context context; @@ -84,7 +83,7 @@ final class ControlPlaneClient { private boolean shutdown; @Nullable - private AbstractAdsStream adsStream; + private AdsStream adsStream; @Nullable private BackoffPolicy retryBackoffPolicy; @Nullable @@ -93,7 +92,7 @@ final class ControlPlaneClient { /** An entity that manages ADS RPCs over a single channel. */ // TODO: rename to XdsChannel ControlPlaneClient( - XdsChannelFactory xdsChannelFactory, + XdsTransport xdsTransport, ServerInfo serverInfo, Node bootstrapNode, XdsResponseHandler xdsResponseHandler, @@ -106,7 +105,7 @@ final class ControlPlaneClient { Supplier stopwatchSupplier, XdsClient.TimerLaunch timerLaunch) { this.serverInfo = checkNotNull(serverInfo, "serverInfo"); - this.channel = checkNotNull(xdsChannelFactory, "xdsChannelFactory").create(serverInfo); + this.xdsTransport = checkNotNull(xdsTransport, "xdsTransport"); this.xdsResponseHandler = checkNotNull(xdsResponseHandler, "xdsResponseHandler"); this.resourceStore = checkNotNull(resourceStore, "resourcesSubscriber"); this.bootstrapNode = checkNotNull(bootstrapNode, "bootstrapNode"); @@ -121,12 +120,6 @@ final class ControlPlaneClient { logger.log(XdsLogLevel.INFO, "Created"); } - /** The underlying channel. */ - // Currently, only externally used for LrsClient. - Channel channel() { - return channel; - } - void shutdown() { syncContext.execute(new Runnable() { @Override @@ -139,7 +132,7 @@ public void run() { if (rpcRetryTimer != null && rpcRetryTimer.isPending()) { rpcRetryTimer.cancel(); } - channel.shutdown(); + xdsTransport.shutdown(); } }); } @@ -207,7 +200,7 @@ boolean isInBackoff() { } boolean isReady() { - return adsStream != null && adsStream.isReady(); + return adsStream != null && adsStream.call != null && adsStream.call.isReady(); } /** @@ -234,10 +227,9 @@ void readyHandler() { // Must be synchronized. private void startRpcStream() { checkState(adsStream == null, "Previous adsStream has not been cleared yet"); - adsStream = new AdsStreamV3(); Context prevContext = context.attach(); try { - adsStream.start(); + adsStream = new AdsStream(); } finally { context.detach(prevContext); } @@ -271,7 +263,7 @@ XdsResourceType fromTypeUrl(String typeUrl) { return resourceStore.getSubscribedResourceTypesWithTypeUrl().get(typeUrl); } - private abstract class AbstractAdsStream { + private class AdsStream implements EventHandler { private boolean responseReceived; private boolean closed; // Response nonce for the most recently received discovery responses of each resource type. @@ -281,14 +273,15 @@ private abstract class AbstractAdsStream { // To avoid confusion, client-initiated requests will always use the nonce in // most recently received responses of each resource type. private final Map, String> respNonces = new HashMap<>(); - - abstract void start(); - - abstract void sendError(Exception error); - - abstract boolean isReady(); - - abstract void request(int count); + private final StreamingCall call; + private final MethodDescriptor methodDescriptor = + AggregatedDiscoveryServiceGrpc.getStreamAggregatedResourcesMethod(); + + private AdsStream() { + this.call = xdsTransport.createStreamingCall(methodDescriptor.getFullMethodName(), + methodDescriptor.getRequestMarshaller(), methodDescriptor.getResponseMarshaller()); + call.start(this); + } /** * Sends a discovery request with the given {@code versionInfo}, {@code nonce} and @@ -296,8 +289,30 @@ private abstract class AbstractAdsStream { * client-initiated discovery requests, use {@link * #sendDiscoveryRequest(XdsResourceType, Collection)}. */ - abstract void sendDiscoveryRequest(XdsResourceType type, String version, - Collection resources, String nonce, @Nullable String errorDetail); + void sendDiscoveryRequest(XdsResourceType type, String versionInfo, + Collection resources, String nonce, + @Nullable String errorDetail) { + DiscoveryRequest.Builder builder = + DiscoveryRequest.newBuilder() + .setVersionInfo(versionInfo) + .setNode(bootstrapNode.toEnvoyProtoNode()) + .addAllResourceNames(resources) + .setTypeUrl(type.typeUrl()) + .setResponseNonce(nonce); + if (errorDetail != null) { + com.google.rpc.Status error = + com.google.rpc.Status.newBuilder() + .setCode(Code.INVALID_ARGUMENT_VALUE) // FIXME(chengyuanzhang): use correct code + .setMessage(errorDetail) + .build(); + builder.setErrorDetail(error); + } + DiscoveryRequest request = builder.build(); + call.sendMessage(request); + if (logger.isLoggable(XdsLogLevel.DEBUG)) { + logger.log(XdsLogLevel.DEBUG, "Sent DiscoveryRequest\n{0}", MessagePrinter.print(request)); + } + } /** * Sends a client-initiated discovery request. @@ -308,6 +323,48 @@ final void sendDiscoveryRequest(XdsResourceType type, Collection reso respNonces.getOrDefault(type, ""), null); } + @Override + public void onReady() { + readyHandler(); + } + + @Override + public void onRecvMessage(DiscoveryResponse response) { + syncContext.execute(new Runnable() { + @Override + public void run() { + XdsResourceType type = fromTypeUrl(response.getTypeUrl()); + if (logger.isLoggable(XdsLogLevel.DEBUG)) { + logger.log( + XdsLogLevel.DEBUG, "Received {0} response:\n{1}", type, + MessagePrinter.print(response)); + } + if (type == null) { + logger.log( + XdsLogLevel.WARNING, + "Ignore an unknown type of DiscoveryResponse: {0}", + response.getTypeUrl()); + + call.startRecvMessage(); + return; + } + handleRpcResponse(type, response.getVersionInfo(), response.getResourcesList(), + response.getNonce()); + } + }); + } + + @Override + public void onStatusReceived(final Status status) { + syncContext.execute(() -> { + if (status.isOk()) { + handleRpcStreamClosed(Status.UNAVAILABLE.withDescription(CLOSED_BY_SERVER)); + } else { + handleRpcStreamClosed(status); + } + }); + } + final void handleRpcResponse(XdsResourceType type, String versionInfo, List resources, String nonce) { checkNotNull(type, "type"); @@ -316,20 +373,13 @@ final void handleRpcResponse(XdsResourceType type, String versionInfo, List request(1), syncContext); + ProcessingTracker processingTracker = new ProcessingTracker( + () -> call.startRecvMessage(), syncContext); xdsResponseHandler.handleResourceResponse(type, serverInfo, versionInfo, resources, nonce, processingTracker); processingTracker.onComplete(); } - final void handleRpcError(Throwable t) { - handleRpcStreamClosed(Status.fromThrowable(t)); - } - - final void handleRpcCompleted() { - handleRpcStreamClosed(Status.UNAVAILABLE.withDescription(CLOSED_BY_SERVER)); - } - private void handleRpcStreamClosed(Status error) { if (closed) { return; @@ -366,7 +416,7 @@ private void close(Exception error) { } closed = true; cleanUp(); - sendError(error); + call.sendError(error); } private void cleanUp() { @@ -375,115 +425,4 @@ private void cleanUp() { } } } - - private final class AdsStreamV3 extends AbstractAdsStream { - private ClientCallStreamObserver requestWriter; - - @Override - public boolean isReady() { - return requestWriter != null && ((ClientCallStreamObserver) requestWriter).isReady(); - } - - @Override - @SuppressWarnings("unchecked") - void start() { - AggregatedDiscoveryServiceGrpc.AggregatedDiscoveryServiceStub stub = - AggregatedDiscoveryServiceGrpc.newStub(channel); - - final class AdsClientResponseObserver - implements ClientResponseObserver { - - @Override - public void beforeStart(ClientCallStreamObserver requestStream) { - requestStream.disableAutoRequestWithInitial(1); - requestStream.setOnReadyHandler(ControlPlaneClient.this::readyHandler); - } - - @Override - public void onNext(final DiscoveryResponse response) { - syncContext.execute(new Runnable() { - @Override - public void run() { - XdsResourceType type = fromTypeUrl(response.getTypeUrl()); - if (logger.isLoggable(XdsLogLevel.DEBUG)) { - logger.log( - XdsLogLevel.DEBUG, "Received {0} response:\n{1}", type, - MessagePrinter.print(response)); - } - if (type == null) { - logger.log( - XdsLogLevel.WARNING, - "Ignore an unknown type of DiscoveryResponse: {0}", - response.getTypeUrl()); - request(1); - return; - } - handleRpcResponse(type, response.getVersionInfo(), response.getResourcesList(), - response.getNonce()); - } - }); - } - - @Override - public void onError(final Throwable t) { - syncContext.execute(new Runnable() { - @Override - public void run() { - handleRpcError(t); - } - }); - } - - @Override - public void onCompleted() { - syncContext.execute(new Runnable() { - @Override - public void run() { - handleRpcCompleted(); - } - }); - } - } - - requestWriter = (ClientCallStreamObserver) stub.streamAggregatedResources( - new AdsClientResponseObserver()); - } - - @Override - void sendDiscoveryRequest(XdsResourceType type, String versionInfo, - Collection resources, String nonce, - @Nullable String errorDetail) { - checkState(requestWriter != null, "ADS stream has not been started"); - DiscoveryRequest.Builder builder = - DiscoveryRequest.newBuilder() - .setVersionInfo(versionInfo) - .setNode(bootstrapNode.toEnvoyProtoNode()) - .addAllResourceNames(resources) - .setTypeUrl(type.typeUrl()) - .setResponseNonce(nonce); - if (errorDetail != null) { - com.google.rpc.Status error = - com.google.rpc.Status.newBuilder() - .setCode(Code.INVALID_ARGUMENT_VALUE) // FIXME(chengyuanzhang): use correct code - .setMessage(errorDetail) - .build(); - builder.setErrorDetail(error); - } - DiscoveryRequest request = builder.build(); - requestWriter.onNext(request); - if (logger.isLoggable(XdsLogLevel.DEBUG)) { - logger.log(XdsLogLevel.DEBUG, "Sent DiscoveryRequest\n{0}", MessagePrinter.print(request)); - } - } - - @Override - void request(int count) { - requestWriter.request(count); - } - - @Override - void sendError(Exception error) { - requestWriter.onError(error); - } - } } diff --git a/xds/src/main/java/io/grpc/xds/GrpcXdsTransportFactory.java b/xds/src/main/java/io/grpc/xds/GrpcXdsTransportFactory.java index b63b3cc0d88..3ff61ada040 100644 --- a/xds/src/main/java/io/grpc/xds/GrpcXdsTransportFactory.java +++ b/xds/src/main/java/io/grpc/xds/GrpcXdsTransportFactory.java @@ -18,6 +18,7 @@ import static com.google.common.base.Preconditions.checkNotNull; +import com.google.common.annotations.VisibleForTesting; import io.grpc.CallOptions; import io.grpc.ChannelCredentials; import io.grpc.ClientCall; @@ -30,14 +31,21 @@ final class GrpcXdsTransportFactory implements XdsTransportFactory { - static final XdsTransportFactory DEFAULT_XDS_TRANSPORT_FACTORY = new GrpcXdsTransportFactory(); + static final GrpcXdsTransportFactory DEFAULT_XDS_TRANSPORT_FACTORY = + new GrpcXdsTransportFactory(); @Override public XdsTransport create(Bootstrapper.ServerInfo serverInfo) { return new GrpcXdsTransport(serverInfo); } - private class GrpcXdsTransport implements XdsTransport { + @VisibleForTesting + public XdsTransport createForTest(ManagedChannel channel) { + return new GrpcXdsTransport(channel); + } + + @VisibleForTesting + static class GrpcXdsTransport implements XdsTransport { private final ManagedChannel channel; @@ -49,6 +57,11 @@ public GrpcXdsTransport(Bootstrapper.ServerInfo serverInfo) { .build(); } + @VisibleForTesting + public GrpcXdsTransport(ManagedChannel channel) { + this.channel = checkNotNull(channel, "channel"); + } + @Override public StreamingCall createStreamingCall( String fullMethodName, diff --git a/xds/src/main/java/io/grpc/xds/LoadReportClient.java b/xds/src/main/java/io/grpc/xds/LoadReportClient.java index b86c8110f63..fe22586b074 100644 --- a/xds/src/main/java/io/grpc/xds/LoadReportClient.java +++ b/xds/src/main/java/io/grpc/xds/LoadReportClient.java @@ -27,19 +27,21 @@ import io.envoyproxy.envoy.service.load_stats.v3.LoadReportingServiceGrpc; import io.envoyproxy.envoy.service.load_stats.v3.LoadStatsRequest; import io.envoyproxy.envoy.service.load_stats.v3.LoadStatsResponse; -import io.grpc.Channel; import io.grpc.Context; import io.grpc.InternalLogId; +import io.grpc.MethodDescriptor; import io.grpc.Status; import io.grpc.SynchronizationContext; import io.grpc.SynchronizationContext.ScheduledHandle; import io.grpc.internal.BackoffPolicy; -import io.grpc.stub.StreamObserver; import io.grpc.xds.EnvoyProtoData.Node; import io.grpc.xds.Stats.ClusterStats; import io.grpc.xds.Stats.DroppedRequests; import io.grpc.xds.Stats.UpstreamLocalityStats; import io.grpc.xds.XdsLogger.XdsLogLevel; +import io.grpc.xds.XdsTransportFactory.EventHandler; +import io.grpc.xds.XdsTransportFactory.StreamingCall; +import io.grpc.xds.XdsTransportFactory.XdsTransport; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -55,7 +57,7 @@ final class LoadReportClient { private final InternalLogId logId; private final XdsLogger logger; - private final Channel channel; + private final XdsTransport xdsTransport; private final Context context; private final Node node; private final SynchronizationContext syncContext; @@ -64,7 +66,6 @@ final class LoadReportClient { private final BackoffPolicy.Provider backoffPolicyProvider; @VisibleForTesting final LoadStatsManager2 loadStatsManager; - private boolean started; @Nullable private BackoffPolicy lrsRpcRetryPolicy; @@ -73,10 +74,12 @@ final class LoadReportClient { @Nullable @VisibleForTesting LrsStream lrsStream; + private static final MethodDescriptor method = + LoadReportingServiceGrpc.getStreamLoadStatsMethod(); LoadReportClient( LoadStatsManager2 loadStatsManager, - Channel channel, + XdsTransport xdsTransport, Context context, Node node, SynchronizationContext syncContext, @@ -84,7 +87,7 @@ final class LoadReportClient { BackoffPolicy.Provider backoffPolicyProvider, Supplier stopwatchSupplier) { this.loadStatsManager = checkNotNull(loadStatsManager, "loadStatsManager"); - this.channel = checkNotNull(channel, "xdsChannel"); + this.xdsTransport = checkNotNull(xdsTransport, "xdsTransport"); this.context = checkNotNull(context, "context"); this.syncContext = checkNotNull(syncContext, "syncContext"); this.timerService = checkNotNull(scheduledExecutorService, "timeService"); @@ -160,66 +163,62 @@ private void startLrsRpc() { return; } checkState(lrsStream == null, "previous lbStream has not been cleared yet"); - lrsStream = new LrsStream(); retryStopwatch.reset().start(); Context prevContext = context.attach(); try { - lrsStream.start(); + lrsStream = new LrsStream(); } finally { context.detach(prevContext); } } - private final class LrsStream { + private final class LrsStream implements EventHandler { boolean initialResponseReceived; boolean closed; long intervalNano = -1; boolean reportAllClusters; List clusterNames; // clusters to report loads for, if not report all. ScheduledHandle loadReportTimer; - StreamObserver lrsRequestWriterV3; - - void start() { - StreamObserver lrsResponseReaderV3 = - new StreamObserver() { - @Override - public void onNext(final LoadStatsResponse response) { - syncContext.execute(new Runnable() { - @Override - public void run() { - logger.log(XdsLogLevel.DEBUG, "Received LRS response:\n{0}", response); - handleRpcResponse(response.getClustersList(), response.getSendAllClusters(), - Durations.toNanos(response.getLoadReportingInterval())); - } - }); - } - - @Override - public void onError(final Throwable t) { - syncContext.execute(new Runnable() { - @Override - public void run() { - handleRpcError(t); - } - }); - } + private final StreamingCall call; - @Override - public void onCompleted() { - syncContext.execute(new Runnable() { - @Override - public void run() { - handleRpcCompleted(); - } - }); - } - }; - lrsRequestWriterV3 = LoadReportingServiceGrpc.newStub(channel).withWaitForReady() - .streamLoadStats(lrsResponseReaderV3); + LrsStream() { + this.call = xdsTransport.createStreamingCall(method.getFullMethodName(), + method.getRequestMarshaller(), method.getResponseMarshaller()); + call.start(this); logger.log(XdsLogLevel.DEBUG, "Sending initial LRS request"); sendLoadStatsRequest(Collections.emptyList()); } + @Override + public void onReady() {} + + @Override + public void onRecvMessage(LoadStatsResponse response) { + syncContext.execute(new Runnable() { + @Override + public void run() { + logger.log(XdsLogLevel.DEBUG, "Received LRS response:\n{0}", response); + handleRpcResponse(response.getClustersList(), response.getSendAllClusters(), + Durations.toNanos(response.getLoadReportingInterval())); + call.startRecvMessage(); + } + }); + } + + @Override + public void onStatusReceived(final Status status) { + syncContext.execute(new Runnable() { + @Override + public void run() { + if (status.isOk()) { + handleStreamClosed(Status.UNAVAILABLE.withDescription("Closed by server")); + } else { + handleStreamClosed(status); + } + } + }); + } + void sendLoadStatsRequest(List clusterStatsList) { LoadStatsRequest.Builder requestBuilder = LoadStatsRequest.newBuilder().setNode(node.toEnvoyProtoNode()); @@ -227,14 +226,10 @@ void sendLoadStatsRequest(List clusterStatsList) { requestBuilder.addClusterStats(buildClusterStats(stats)); } LoadStatsRequest request = requestBuilder.build(); - lrsRequestWriterV3.onNext(request); + call.sendMessage(request); logger.log(XdsLogLevel.DEBUG, "Sent LoadStatsRequest\n{0}", request); } - void sendError(Exception error) { - lrsRequestWriterV3.onError(error); - } - void handleRpcResponse(List clusters, boolean sendAllClusters, long loadReportIntervalNano) { if (closed) { @@ -256,14 +251,6 @@ void handleRpcResponse(List clusters, boolean sendAllClusters, scheduleNextLoadReport(); } - void handleRpcError(Throwable t) { - handleStreamClosed(Status.fromThrowable(t)); - } - - void handleRpcCompleted() { - handleStreamClosed(Status.UNAVAILABLE.withDescription("Closed by server")); - } - private void sendLoadReport() { if (closed) { return; @@ -330,7 +317,7 @@ private void close(Exception error) { } closed = true; cleanUp(); - sendError(error); + call.sendError(error); } private void cleanUp() { diff --git a/xds/src/main/java/io/grpc/xds/SharedXdsClientPoolProvider.java b/xds/src/main/java/io/grpc/xds/SharedXdsClientPoolProvider.java index 5aabd976085..c4d5589e57c 100644 --- a/xds/src/main/java/io/grpc/xds/SharedXdsClientPoolProvider.java +++ b/xds/src/main/java/io/grpc/xds/SharedXdsClientPoolProvider.java @@ -17,6 +17,7 @@ package io.grpc.xds; import static com.google.common.base.Preconditions.checkNotNull; +import static io.grpc.xds.GrpcXdsTransportFactory.DEFAULT_XDS_TRANSPORT_FACTORY; import com.google.common.annotations.VisibleForTesting; import io.grpc.Context; @@ -26,7 +27,6 @@ import io.grpc.internal.SharedResourceHolder; import io.grpc.internal.TimeProvider; import io.grpc.xds.Bootstrapper.BootstrapInfo; -import io.grpc.xds.XdsClientImpl.XdsChannelFactory; import io.grpc.xds.XdsNameResolverProvider.XdsClientPoolFactory; import io.grpc.xds.internal.security.TlsContextManagerImpl; import java.util.Map; @@ -124,7 +124,7 @@ public XdsClient getObject() { if (refCount == 0) { scheduler = SharedResourceHolder.get(GrpcUtil.TIMER_SERVICE); xdsClient = new XdsClientImpl( - XdsChannelFactory.DEFAULT_XDS_CHANNEL_FACTORY, + DEFAULT_XDS_TRANSPORT_FACTORY, bootstrapInfo, context, scheduler, diff --git a/xds/src/main/java/io/grpc/xds/XdsClientImpl.java b/xds/src/main/java/io/grpc/xds/XdsClientImpl.java index 0e5e7106c17..17323396409 100644 --- a/xds/src/main/java/io/grpc/xds/XdsClientImpl.java +++ b/xds/src/main/java/io/grpc/xds/XdsClientImpl.java @@ -31,12 +31,9 @@ import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; import com.google.protobuf.Any; -import io.grpc.ChannelCredentials; import io.grpc.Context; -import io.grpc.Grpc; import io.grpc.InternalLogId; import io.grpc.LoadBalancerRegistry; -import io.grpc.ManagedChannel; import io.grpc.Status; import io.grpc.SynchronizationContext; import io.grpc.SynchronizationContext.ScheduledHandle; @@ -100,7 +97,7 @@ public void uncaughtException(Thread t, Throwable e) { private final Map> subscribedResourceTypeUrls = new HashMap<>(); private final Map loadStatsManagerMap = new HashMap<>(); private final Map serverLrsClientMap = new HashMap<>(); - private final XdsChannelFactory xdsChannelFactory; + private final XdsTransportFactory xdsTransportFactory; private final Bootstrapper.BootstrapInfo bootstrapInfo; private final Context context; private final ScheduledExecutorService timeService; @@ -113,7 +110,7 @@ public void uncaughtException(Thread t, Throwable e) { private volatile boolean isShutdown; XdsClientImpl( - XdsChannelFactory xdsChannelFactory, + XdsTransportFactory xdsTransportFactory, Bootstrapper.BootstrapInfo bootstrapInfo, Context context, ScheduledExecutorService timeService, @@ -121,7 +118,7 @@ public void uncaughtException(Thread t, Throwable e) { Supplier stopwatchSupplier, TimeProvider timeProvider, TlsContextManager tlsContextManager) { - this.xdsChannelFactory = xdsChannelFactory; + this.xdsTransportFactory = xdsTransportFactory; this.bootstrapInfo = bootstrapInfo; this.context = context; this.timeService = timeService; @@ -142,8 +139,9 @@ private void maybeCreateXdsChannelWithLrs(ServerInfo serverInfo) { if (serverChannelMap.containsKey(serverInfo)) { return; } + XdsTransportFactory.XdsTransport xdsTransport = xdsTransportFactory.create(serverInfo); ControlPlaneClient xdsChannel = new ControlPlaneClient( - xdsChannelFactory, + xdsTransport, serverInfo, bootstrapInfo.node(), this, @@ -157,7 +155,7 @@ private void maybeCreateXdsChannelWithLrs(ServerInfo serverInfo) { LoadStatsManager2 loadStatsManager = new LoadStatsManager2(stopwatchSupplier); loadStatsManagerMap.put(serverInfo, loadStatsManager); LoadReportClient lrsClient = new LoadReportClient( - loadStatsManager, xdsChannel.channel(), context, bootstrapInfo.node(), syncContext, + loadStatsManager, xdsTransport, context, bootstrapInfo.node(), syncContext, timeService, backoffPolicyProvider, stopwatchSupplier); serverChannelMap.put(serverInfo, xdsChannel); serverLrsClientMap.put(serverInfo, lrsClient); @@ -747,19 +745,4 @@ private void notifyWatcher(ResourceWatcher watcher, T update) { watcher.onChanged(update); } } - - abstract static class XdsChannelFactory { - static final XdsChannelFactory DEFAULT_XDS_CHANNEL_FACTORY = new XdsChannelFactory() { - @Override - ManagedChannel create(ServerInfo serverInfo) { - String target = serverInfo.target(); - ChannelCredentials channelCredentials = serverInfo.channelCredentials(); - return Grpc.newChannelBuilder(target, channelCredentials) - .keepAliveTime(5, TimeUnit.MINUTES) - .build(); - } - }; - - abstract ManagedChannel create(ServerInfo serverInfo); - } } diff --git a/xds/src/test/java/io/grpc/xds/LoadReportClientTest.java b/xds/src/test/java/io/grpc/xds/LoadReportClientTest.java index 910a9fc3285..49325ea7674 100644 --- a/xds/src/test/java/io/grpc/xds/LoadReportClientTest.java +++ b/xds/src/test/java/io/grpc/xds/LoadReportClientTest.java @@ -174,7 +174,9 @@ public void cancelled(Context context) { when(backoffPolicy2.nextBackoffNanos()) .thenReturn(TimeUnit.SECONDS.toNanos(2L), TimeUnit.SECONDS.toNanos(20L)); addFakeStatsData(); - lrsClient = new LoadReportClient(loadStatsManager, channel, Context.ROOT, NODE, + lrsClient = new LoadReportClient(loadStatsManager, + GrpcXdsTransportFactory.DEFAULT_XDS_TRANSPORT_FACTORY.createForTest(channel), + Context.ROOT, NODE, syncContext, fakeClock.getScheduledExecutorService(), backoffPolicyProvider, fakeClock.getStopwatchSupplier()); syncContext.execute(new Runnable() { diff --git a/xds/src/test/java/io/grpc/xds/XdsClientImplTestBase.java b/xds/src/test/java/io/grpc/xds/XdsClientImplTestBase.java index 06f4dd9a625..4d714bf37d7 100644 --- a/xds/src/test/java/io/grpc/xds/XdsClientImplTestBase.java +++ b/xds/src/test/java/io/grpc/xds/XdsClientImplTestBase.java @@ -18,7 +18,7 @@ import static com.google.common.truth.Truth.assertThat; import static com.google.common.truth.Truth.assertWithMessage; -import static io.grpc.xds.XdsClientImpl.XdsChannelFactory.DEFAULT_XDS_CHANNEL_FACTORY; +import static io.grpc.xds.GrpcXdsTransportFactory.DEFAULT_XDS_TRANSPORT_FACTORY; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.isA; @@ -85,13 +85,13 @@ import io.grpc.xds.EnvoyServerProtoData.FilterChain; import io.grpc.xds.EnvoyServerProtoData.SuccessRateEjection; import io.grpc.xds.FaultConfig.FractionalPercent.DenominatorType; +import io.grpc.xds.GrpcXdsTransportFactory.GrpcXdsTransport; import io.grpc.xds.LoadStatsManager2.ClusterDropStats; import io.grpc.xds.XdsClient.ResourceMetadata; import io.grpc.xds.XdsClient.ResourceMetadata.ResourceMetadataStatus; import io.grpc.xds.XdsClient.ResourceMetadata.UpdateFailureState; import io.grpc.xds.XdsClient.ResourceUpdate; import io.grpc.xds.XdsClient.ResourceWatcher; -import io.grpc.xds.XdsClientImpl.XdsChannelFactory; import io.grpc.xds.XdsClusterResource.CdsUpdate; import io.grpc.xds.XdsClusterResource.CdsUpdate.ClusterType; import io.grpc.xds.XdsEndpointResource.EdsUpdate; @@ -322,25 +322,25 @@ public void setUp() throws IOException { .start()); channel = cleanupRule.register(InProcessChannelBuilder.forName(serverName).directExecutor().build()); - XdsChannelFactory xdsChannelFactory = new XdsChannelFactory() { + XdsTransportFactory xdsTransportFactory = new XdsTransportFactory() { @Override - ManagedChannel create(ServerInfo serverInfo) { + public XdsTransport create(ServerInfo serverInfo) { if (serverInfo.target().equals(SERVER_URI)) { - return channel; + return new GrpcXdsTransport(channel); } if (serverInfo.target().equals(SERVER_URI_CUSTOME_AUTHORITY)) { if (channelForCustomAuthority == null) { channelForCustomAuthority = cleanupRule.register( InProcessChannelBuilder.forName(serverName).directExecutor().build()); } - return channelForCustomAuthority; + return new GrpcXdsTransport(channelForCustomAuthority); } if (serverInfo.target().equals(SERVER_URI_EMPTY_AUTHORITY)) { if (channelForEmptyAuthority == null) { channelForEmptyAuthority = cleanupRule.register( InProcessChannelBuilder.forName(serverName).directExecutor().build()); } - return channelForEmptyAuthority; + return new GrpcXdsTransport(channelForEmptyAuthority); } throw new IllegalArgumentException("Can not create channel for " + serverInfo); } @@ -368,7 +368,7 @@ ManagedChannel create(ServerInfo serverInfo) { .build(); xdsClient = new XdsClientImpl( - xdsChannelFactory, + xdsTransportFactory, bootstrapInfo, Context.ROOT, fakeClock.getScheduledExecutorService(), @@ -3743,7 +3743,7 @@ public void sendToNonexistentHost() throws Exception { private XdsClientImpl createXdsClient(String serverUri) { BootstrapInfo bootstrapInfo = buildBootStrap(serverUri); return new XdsClientImpl( - DEFAULT_XDS_CHANNEL_FACTORY, + DEFAULT_XDS_TRANSPORT_FACTORY, bootstrapInfo, Context.ROOT, fakeClock.getScheduledExecutorService(),