Skip to content

Commit

Permalink
xds: change controlPlaneClient and loadReportClient to use xdsTranspo…
Browse files Browse the repository at this point in the history
…rtFactory (grpc#10829)
  • Loading branch information
YifeiZhuang authored Jan 26, 2024
1 parent 6d96e65 commit 8e1cc94
Show file tree
Hide file tree
Showing 7 changed files with 173 additions and 249 deletions.
241 changes: 90 additions & 151 deletions xds/src/main/java/io/grpc/xds/ControlPlaneClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,23 +28,22 @@
import io.envoyproxy.envoy.service.discovery.v3.AggregatedDiscoveryServiceGrpc;
import io.envoyproxy.envoy.service.discovery.v3.DiscoveryRequest;
import io.envoyproxy.envoy.service.discovery.v3.DiscoveryResponse;
import io.grpc.Channel;
import io.grpc.Context;
import io.grpc.InternalLogId;
import io.grpc.ManagedChannel;
import io.grpc.MethodDescriptor;
import io.grpc.Status;
import io.grpc.SynchronizationContext;
import io.grpc.SynchronizationContext.ScheduledHandle;
import io.grpc.internal.BackoffPolicy;
import io.grpc.stub.ClientCallStreamObserver;
import io.grpc.stub.ClientResponseObserver;
import io.grpc.xds.Bootstrapper.ServerInfo;
import io.grpc.xds.EnvoyProtoData.Node;
import io.grpc.xds.XdsClient.ProcessingTracker;
import io.grpc.xds.XdsClient.ResourceStore;
import io.grpc.xds.XdsClient.XdsResponseHandler;
import io.grpc.xds.XdsClientImpl.XdsChannelFactory;
import io.grpc.xds.XdsLogger.XdsLogLevel;
import io.grpc.xds.XdsTransportFactory.EventHandler;
import io.grpc.xds.XdsTransportFactory.StreamingCall;
import io.grpc.xds.XdsTransportFactory.XdsTransport;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
Expand All @@ -67,7 +66,7 @@ final class ControlPlaneClient {
private final InternalLogId logId;
private final XdsLogger logger;
private final ServerInfo serverInfo;
private final ManagedChannel channel;
private final XdsTransport xdsTransport;
private final XdsResponseHandler xdsResponseHandler;
private final ResourceStore resourceStore;
private final Context context;
Expand All @@ -84,7 +83,7 @@ final class ControlPlaneClient {

private boolean shutdown;
@Nullable
private AbstractAdsStream adsStream;
private AdsStream adsStream;
@Nullable
private BackoffPolicy retryBackoffPolicy;
@Nullable
Expand All @@ -93,7 +92,7 @@ final class ControlPlaneClient {
/** An entity that manages ADS RPCs over a single channel. */
// TODO: rename to XdsChannel
ControlPlaneClient(
XdsChannelFactory xdsChannelFactory,
XdsTransport xdsTransport,
ServerInfo serverInfo,
Node bootstrapNode,
XdsResponseHandler xdsResponseHandler,
Expand All @@ -106,7 +105,7 @@ final class ControlPlaneClient {
Supplier<Stopwatch> stopwatchSupplier,
XdsClient.TimerLaunch timerLaunch) {
this.serverInfo = checkNotNull(serverInfo, "serverInfo");
this.channel = checkNotNull(xdsChannelFactory, "xdsChannelFactory").create(serverInfo);
this.xdsTransport = checkNotNull(xdsTransport, "xdsTransport");
this.xdsResponseHandler = checkNotNull(xdsResponseHandler, "xdsResponseHandler");
this.resourceStore = checkNotNull(resourceStore, "resourcesSubscriber");
this.bootstrapNode = checkNotNull(bootstrapNode, "bootstrapNode");
Expand All @@ -121,12 +120,6 @@ final class ControlPlaneClient {
logger.log(XdsLogLevel.INFO, "Created");
}

/** The underlying channel. */
// Currently, only externally used for LrsClient.
Channel channel() {
return channel;
}

void shutdown() {
syncContext.execute(new Runnable() {
@Override
Expand All @@ -139,7 +132,7 @@ public void run() {
if (rpcRetryTimer != null && rpcRetryTimer.isPending()) {
rpcRetryTimer.cancel();
}
channel.shutdown();
xdsTransport.shutdown();
}
});
}
Expand Down Expand Up @@ -207,7 +200,7 @@ boolean isInBackoff() {
}

boolean isReady() {
return adsStream != null && adsStream.isReady();
return adsStream != null && adsStream.call != null && adsStream.call.isReady();
}

/**
Expand All @@ -234,10 +227,9 @@ void readyHandler() {
// Must be synchronized.
private void startRpcStream() {
checkState(adsStream == null, "Previous adsStream has not been cleared yet");
adsStream = new AdsStreamV3();
Context prevContext = context.attach();
try {
adsStream.start();
adsStream = new AdsStream();
} finally {
context.detach(prevContext);
}
Expand Down Expand Up @@ -271,7 +263,7 @@ XdsResourceType<?> fromTypeUrl(String typeUrl) {
return resourceStore.getSubscribedResourceTypesWithTypeUrl().get(typeUrl);
}

private abstract class AbstractAdsStream {
private class AdsStream implements EventHandler<DiscoveryResponse> {
private boolean responseReceived;
private boolean closed;
// Response nonce for the most recently received discovery responses of each resource type.
Expand All @@ -281,23 +273,46 @@ private abstract class AbstractAdsStream {
// To avoid confusion, client-initiated requests will always use the nonce in
// most recently received responses of each resource type.
private final Map<XdsResourceType<?>, String> respNonces = new HashMap<>();

abstract void start();

abstract void sendError(Exception error);

abstract boolean isReady();

abstract void request(int count);
private final StreamingCall<DiscoveryRequest, DiscoveryResponse> call;
private final MethodDescriptor<DiscoveryRequest, DiscoveryResponse> methodDescriptor =
AggregatedDiscoveryServiceGrpc.getStreamAggregatedResourcesMethod();

private AdsStream() {
this.call = xdsTransport.createStreamingCall(methodDescriptor.getFullMethodName(),
methodDescriptor.getRequestMarshaller(), methodDescriptor.getResponseMarshaller());
call.start(this);
}

/**
* Sends a discovery request with the given {@code versionInfo}, {@code nonce} and
* {@code errorDetail}. Used for reacting to a specific discovery response. For
* client-initiated discovery requests, use {@link
* #sendDiscoveryRequest(XdsResourceType, Collection)}.
*/
abstract void sendDiscoveryRequest(XdsResourceType<?> type, String version,
Collection<String> resources, String nonce, @Nullable String errorDetail);
void sendDiscoveryRequest(XdsResourceType<?> type, String versionInfo,
Collection<String> resources, String nonce,
@Nullable String errorDetail) {
DiscoveryRequest.Builder builder =
DiscoveryRequest.newBuilder()
.setVersionInfo(versionInfo)
.setNode(bootstrapNode.toEnvoyProtoNode())
.addAllResourceNames(resources)
.setTypeUrl(type.typeUrl())
.setResponseNonce(nonce);
if (errorDetail != null) {
com.google.rpc.Status error =
com.google.rpc.Status.newBuilder()
.setCode(Code.INVALID_ARGUMENT_VALUE) // FIXME(chengyuanzhang): use correct code
.setMessage(errorDetail)
.build();
builder.setErrorDetail(error);
}
DiscoveryRequest request = builder.build();
call.sendMessage(request);
if (logger.isLoggable(XdsLogLevel.DEBUG)) {
logger.log(XdsLogLevel.DEBUG, "Sent DiscoveryRequest\n{0}", MessagePrinter.print(request));
}
}

/**
* Sends a client-initiated discovery request.
Expand All @@ -308,6 +323,48 @@ final void sendDiscoveryRequest(XdsResourceType<?> type, Collection<String> reso
respNonces.getOrDefault(type, ""), null);
}

@Override
public void onReady() {
readyHandler();
}

@Override
public void onRecvMessage(DiscoveryResponse response) {
syncContext.execute(new Runnable() {
@Override
public void run() {
XdsResourceType<?> type = fromTypeUrl(response.getTypeUrl());
if (logger.isLoggable(XdsLogLevel.DEBUG)) {
logger.log(
XdsLogLevel.DEBUG, "Received {0} response:\n{1}", type,
MessagePrinter.print(response));
}
if (type == null) {
logger.log(
XdsLogLevel.WARNING,
"Ignore an unknown type of DiscoveryResponse: {0}",
response.getTypeUrl());

call.startRecvMessage();
return;
}
handleRpcResponse(type, response.getVersionInfo(), response.getResourcesList(),
response.getNonce());
}
});
}

@Override
public void onStatusReceived(final Status status) {
syncContext.execute(() -> {
if (status.isOk()) {
handleRpcStreamClosed(Status.UNAVAILABLE.withDescription(CLOSED_BY_SERVER));
} else {
handleRpcStreamClosed(status);
}
});
}

final void handleRpcResponse(XdsResourceType<?> type, String versionInfo, List<Any> resources,
String nonce) {
checkNotNull(type, "type");
Expand All @@ -316,20 +373,13 @@ final void handleRpcResponse(XdsResourceType<?> type, String versionInfo, List<A
}
responseReceived = true;
respNonces.put(type, nonce);
ProcessingTracker processingTracker = new ProcessingTracker(() -> request(1), syncContext);
ProcessingTracker processingTracker = new ProcessingTracker(
() -> call.startRecvMessage(), syncContext);
xdsResponseHandler.handleResourceResponse(type, serverInfo, versionInfo, resources, nonce,
processingTracker);
processingTracker.onComplete();
}

final void handleRpcError(Throwable t) {
handleRpcStreamClosed(Status.fromThrowable(t));
}

final void handleRpcCompleted() {
handleRpcStreamClosed(Status.UNAVAILABLE.withDescription(CLOSED_BY_SERVER));
}

private void handleRpcStreamClosed(Status error) {
if (closed) {
return;
Expand Down Expand Up @@ -366,7 +416,7 @@ private void close(Exception error) {
}
closed = true;
cleanUp();
sendError(error);
call.sendError(error);
}

private void cleanUp() {
Expand All @@ -375,115 +425,4 @@ private void cleanUp() {
}
}
}

private final class AdsStreamV3 extends AbstractAdsStream {
private ClientCallStreamObserver<DiscoveryRequest> requestWriter;

@Override
public boolean isReady() {
return requestWriter != null && ((ClientCallStreamObserver<?>) requestWriter).isReady();
}

@Override
@SuppressWarnings("unchecked")
void start() {
AggregatedDiscoveryServiceGrpc.AggregatedDiscoveryServiceStub stub =
AggregatedDiscoveryServiceGrpc.newStub(channel);

final class AdsClientResponseObserver
implements ClientResponseObserver<DiscoveryRequest, DiscoveryResponse> {

@Override
public void beforeStart(ClientCallStreamObserver<DiscoveryRequest> requestStream) {
requestStream.disableAutoRequestWithInitial(1);
requestStream.setOnReadyHandler(ControlPlaneClient.this::readyHandler);
}

@Override
public void onNext(final DiscoveryResponse response) {
syncContext.execute(new Runnable() {
@Override
public void run() {
XdsResourceType<?> type = fromTypeUrl(response.getTypeUrl());
if (logger.isLoggable(XdsLogLevel.DEBUG)) {
logger.log(
XdsLogLevel.DEBUG, "Received {0} response:\n{1}", type,
MessagePrinter.print(response));
}
if (type == null) {
logger.log(
XdsLogLevel.WARNING,
"Ignore an unknown type of DiscoveryResponse: {0}",
response.getTypeUrl());
request(1);
return;
}
handleRpcResponse(type, response.getVersionInfo(), response.getResourcesList(),
response.getNonce());
}
});
}

@Override
public void onError(final Throwable t) {
syncContext.execute(new Runnable() {
@Override
public void run() {
handleRpcError(t);
}
});
}

@Override
public void onCompleted() {
syncContext.execute(new Runnable() {
@Override
public void run() {
handleRpcCompleted();
}
});
}
}

requestWriter = (ClientCallStreamObserver) stub.streamAggregatedResources(
new AdsClientResponseObserver());
}

@Override
void sendDiscoveryRequest(XdsResourceType<?> type, String versionInfo,
Collection<String> resources, String nonce,
@Nullable String errorDetail) {
checkState(requestWriter != null, "ADS stream has not been started");
DiscoveryRequest.Builder builder =
DiscoveryRequest.newBuilder()
.setVersionInfo(versionInfo)
.setNode(bootstrapNode.toEnvoyProtoNode())
.addAllResourceNames(resources)
.setTypeUrl(type.typeUrl())
.setResponseNonce(nonce);
if (errorDetail != null) {
com.google.rpc.Status error =
com.google.rpc.Status.newBuilder()
.setCode(Code.INVALID_ARGUMENT_VALUE) // FIXME(chengyuanzhang): use correct code
.setMessage(errorDetail)
.build();
builder.setErrorDetail(error);
}
DiscoveryRequest request = builder.build();
requestWriter.onNext(request);
if (logger.isLoggable(XdsLogLevel.DEBUG)) {
logger.log(XdsLogLevel.DEBUG, "Sent DiscoveryRequest\n{0}", MessagePrinter.print(request));
}
}

@Override
void request(int count) {
requestWriter.request(count);
}

@Override
void sendError(Exception error) {
requestWriter.onError(error);
}
}
}
Loading

0 comments on commit 8e1cc94

Please sign in to comment.