Skip to content

Commit

Permalink
Checkpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
larry-safran committed Dec 18, 2024
1 parent f995211 commit 66b0268
Show file tree
Hide file tree
Showing 2 changed files with 198 additions and 44 deletions.
61 changes: 49 additions & 12 deletions xds/src/main/java/io/grpc/xds/XdsConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,31 +16,68 @@

package io.grpc.xds;

import static com.google.common.base.Preconditions.checkNotNull;

import io.grpc.StatusOr;
import io.grpc.xds.XdsClusterResource.CdsUpdate;
import io.grpc.xds.XdsEndpointResource.EdsUpdate;
import io.grpc.xds.XdsListenerResource.LdsUpdate;
import io.grpc.xds.XdsRouteConfigureResource.RdsUpdate;
import java.util.HashMap;
import java.util.Map;

public class XdsConfig {
public final XdsListenerResource listener;
public final XdsRouteConfigureResource route;
public final Map<String, StatusOr<XdsClusterConfig>> clusters;
/**
* Represents the xDS configuration tree for a specified Listener.
*/
class XdsConfig {
final LdsUpdate listener;
final RdsUpdate route;
final Map<String, StatusOr<XdsClusterConfig>> clusters;

public XdsConfig(XdsListenerResource listener, XdsRouteConfigureResource route,
Map<String, StatusOr<XdsClusterConfig>> clusters) {
XdsConfig(LdsUpdate listener, RdsUpdate route, Map<String, StatusOr<XdsClusterConfig>> clusters) {
this.listener = listener;
this.route = route;
this.clusters = clusters;
}

public static class XdsClusterConfig {
public final String clusterName;
public final XdsClusterResource clusterResource;
public final XdsEndpointResource endpoint;
static class XdsClusterConfig {
final String clusterName;
final CdsUpdate clusterResource;
final StatusOr<EdsUpdate> endpoint;

public XdsClusterConfig(String clusterName, XdsClusterResource clusterResource,
XdsEndpointResource endpoint) {
XdsClusterConfig(String clusterName, CdsUpdate clusterResource,
StatusOr<EdsUpdate> endpoint) {
this.clusterName = clusterName;
this.clusterResource = clusterResource;
this.endpoint = endpoint;
}
}

static class XdsConfigBuilder {
private LdsUpdate listener;
private RdsUpdate route;
private Map<String, StatusOr<XdsClusterConfig>> clusters = new HashMap<>();

XdsConfigBuilder setListener(LdsUpdate listener) {
this.listener = listener;
return this;
}

XdsConfigBuilder setRoute(RdsUpdate route) {
this.route = route;
return this;
}

XdsConfigBuilder addCluster(String name, StatusOr<XdsClusterConfig> clusterConfig) {
clusters.put(name, clusterConfig);
return this;
}

XdsConfig build() {
checkNotNull(listener, "listener");
checkNotNull(route, "route");
return new XdsConfig(listener, route, clusters);
}
}

}
181 changes: 149 additions & 32 deletions xds/src/main/java/io/grpc/xds/XdsDependencyManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,14 @@

package io.grpc.xds;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static io.grpc.xds.client.XdsClient.ResourceUpdate;
import static io.grpc.xds.client.XdsLogger.XdsLogLevel.DEBUG;

import io.grpc.InternalLogId;
import io.grpc.Status;
import io.grpc.StatusOr;
import io.grpc.SynchronizationContext;
import io.grpc.xds.XdsRouteConfigureResource.RdsUpdate;
import io.grpc.xds.client.XdsClient;
Expand All @@ -34,6 +36,7 @@
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import javax.annotation.Nullable;

/**
* This class acts as a layer of indirection between the XdsClient and the NameResolver. It
Expand All @@ -51,7 +54,7 @@ final class XdsDependencyManager implements XdsClusterSubscriptionRegistry {
private final InternalLogId logId;
private final XdsLogger logger;
private XdsConfig lastXdsConfig = null;
private Map<XdsResourceType<?>, TypeWatchers<?>> resourceWatchers = new HashMap<>();
private final Map<XdsResourceType<?>, TypeWatchers<?>> resourceWatchers = new HashMap<>();

XdsDependencyManager(XdsClient xdsClient, XdsConfigWatcher xdsConfigWatcher,
SynchronizationContext syncContext, String dataPlaneAuthority,
Expand Down Expand Up @@ -87,8 +90,8 @@ public void refreshDynamicSubscriptions() {

@SuppressWarnings("unchecked")
private <T extends ResourceUpdate> void addWatcher(XdsWatcherBase<T> watcher) {
XdsResourceType<T> type = watcher.resourceContext.getResourceType();
String resourceName = watcher.resourceContext.resourceName;
XdsResourceType<T> type = watcher.type;
String resourceName = watcher.resourceName;

this.syncContext.executeLater(() -> {
TypeWatchers<T> typeWatchers = (TypeWatchers<T>)resourceWatchers.get(type);
Expand All @@ -110,7 +113,7 @@ public void shutdown() {
}

private <T extends ResourceUpdate> void shutdownWatchersForType(TypeWatchers<T> watchers) {
for (Map.Entry<String, ResourceWatcher<T>> watcherEntry : watchers.watchers.entrySet()) {
for (Map.Entry<String, XdsWatcherBase<T>> watcherEntry : watchers.watchers.entrySet()) {
xdsClient.cancelXdsResourceWatch(watchers.resourceType, watcherEntry.getKey(),
watcherEntry.getValue());
}
Expand All @@ -133,9 +136,60 @@ private void releaseSubscription(ClusterSubscription subscription) {
}
}

/**
* Check if all resources have results, and if so, generate a new XdsConfig and send it to all
* the watchers.
*/
private void maybePublishConfig() {
// Check if all resources have results and if so generate a new XdsConfig and send it to all
// the watchers
boolean waitingOnResource = resourceWatchers.values().stream()
.flatMap(typeWatchers -> typeWatchers.watchers.values().stream())
.anyMatch(watcher -> !watcher.hasResult());
if (waitingOnResource) {
return;
}

buildConfig();
xdsConfigWatcher.onUpdate(lastXdsConfig);
}

private void buildConfig() {
XdsConfig.XdsConfigBuilder builder = new XdsConfig.XdsConfigBuilder();

// Iterate watchers and build the XdsConfig

// Will only be 1 listener and 1 route resource
resourceWatchers.get(XdsListenerResource.getInstance()).watchers.values().stream()
.map(watcher -> (LdsWatcher) watcher)
.forEach(watcher -> builder.setListener(watcher.getData().getValue()));

resourceWatchers.get(XdsRouteConfigureResource.getInstance()).watchers.values().stream()
.map(watcher -> (RdsWatcher) watcher)
.forEach(watcher -> builder.setRoute(watcher.getData().getValue()));

Map<String, ? extends XdsWatcherBase<?>> edsWatchers =
resourceWatchers.get(XdsEndpointResource.getInstance()).watchers;
Map<String, ? extends XdsWatcherBase<?>> cdsWatchers =
resourceWatchers.get(XdsClusterResource.getInstance()).watchers;

// Iterate CDS watchers
for (XdsWatcherBase<?> watcher : cdsWatchers.values()) {
CdsWatcher cdsWatcher = (CdsWatcher) watcher;
String clusterName = cdsWatcher.resourceName();
StatusOr<XdsClusterResource.CdsUpdate> cdsUpdate = cdsWatcher.getData();
if (cdsUpdate.hasValue()) {
XdsConfig.XdsClusterConfig clusterConfig;
String edsName = cdsUpdate.getValue().edsServiceName();
EdsWatcher edsWatcher = (EdsWatcher) edsWatchers.get(edsName);
assert edsWatcher != null;
clusterConfig = new XdsConfig.XdsClusterConfig(clusterName, cdsUpdate.getValue(),
edsWatcher.getData());
builder.addCluster(clusterName, StatusOr.fromValue(clusterConfig));
} else {
builder.addCluster(clusterName, StatusOr.fromStatus(cdsUpdate.getStatus()));
}
}

lastXdsConfig = builder.build();
}

@Override
Expand All @@ -144,41 +198,31 @@ public String toString() {
}

private static class TypeWatchers<T extends ResourceUpdate> {
Map<String, ResourceWatcher<T>> watchers;
Map<String, XdsWatcherBase<T>> watchers;
XdsResourceType<T> resourceType;

TypeWatchers(XdsResourceType<T> resourceType) {
watchers = new HashMap<>();
this.resourceType = resourceType;
}

public void add(String resourceName, ResourceWatcher<T> watcher) {
public void add(String resourceName, XdsWatcherBase<T> watcher) {
watchers.put(resourceName, watcher);
}
}


public static class ResourceContext<T extends ResourceUpdate> {
String resourceName;
XdsResourceType<T> resourceType;

public ResourceContext(XdsResourceType<T> resourceType, String resourceName) {
this.resourceName = resourceName;
this.resourceType = resourceType;
}

public XdsResourceType<T> getResourceType() {
return resourceType;
}
}

public interface XdsConfigWatcher {

void onUpdate(XdsConfig config);

void onError(ResourceContext<?> resourceContext, Status status);
// These 2 methods are invoked when there is an error or
// does-not-exist on LDS or RDS only. The context will be a
// human-readable string indicating the scope in which the error
// occurred (e.g., the resource type and name).
void onError(String resourceContext, Status status);

void onResourceDoesNotExist(ResourceContext<?> resourceContext);
void onResourceDoesNotExist(String resourceContext);
}

private class ClusterSubscription implements Closeable {
Expand All @@ -198,21 +242,60 @@ public void close() throws IOException {
}
}

private abstract class XdsWatcherBase<T extends ResourceUpdate> implements ResourceWatcher<T> {
final ResourceContext<T> resourceContext;
@SuppressWarnings("ClassCanBeStatic")
private abstract class XdsWatcherBase<T extends ResourceUpdate>
implements ResourceWatcher<T> {
private final XdsResourceType<T> type;
private final String resourceName;
@Nullable
protected StatusOr<T> data;
protected boolean transientError = false;


private XdsWatcherBase(XdsResourceType<T> type, String resourceName) {
this.resourceContext = new ResourceContext<>(type, resourceName);
this.type = type;
this.resourceName = resourceName;
}

@Override
public void onError(Status error) {
xdsConfigWatcher.onError(resourceContext, error);
checkNotNull(error, "error");
data = StatusOr.fromStatus(error);
transientError = true;
}

@Override
public void onResourceDoesNotExist(String resourceName) {
xdsConfigWatcher.onResourceDoesNotExist(resourceContext);
protected void handleDoesNotExist(String resourceName) {
checkArgument(this.resourceName.equals(resourceName), "Resource name does not match");
data = StatusOr.fromStatus(
Status.UNAVAILABLE.withDescription("No " + type + " resource: " + resourceName));
transientError = false;
}

boolean hasResult() {
return data != null;
}

@Nullable
StatusOr<T> getData() {
return data;
}

String resourceName() {
return resourceName;
}

protected void setData(T data) {
checkNotNull(data, "data");
this.data = StatusOr.fromValue(data);
transientError = false;
}

boolean isTransientError() {
return data != null && !data.hasValue() && transientError;
}

String toContextString() {
return type + " resource: " + resourceName;
}
}

Expand All @@ -227,6 +310,19 @@ public void onChanged(XdsListenerResource.LdsUpdate update) {
// TODO: process the update and add an RdsWatcher if needed
// If none needed call maybePublishConfig()
}

@Override
public void onError(Status error) {
super.onError(error);
xdsConfigWatcher.onError(toContextString(), error);
}

@Override
public void onResourceDoesNotExist(String resourceName) {
handleDoesNotExist(resourceName);
xdsConfigWatcher.onResourceDoesNotExist(toContextString());
}

}

private class RdsWatcher extends XdsWatcherBase<RdsUpdate> {
Expand All @@ -240,8 +336,19 @@ public void onChanged(RdsUpdate update) {
// TODO: process the update and add CdsWatchers for all virtual hosts as needed
// If none needed call maybePublishConfig()
}
}

@Override
public void onError(Status error) {
super.onError(error);
xdsConfigWatcher.onError(toContextString(), error);
}

@Override
public void onResourceDoesNotExist(String resourceName) {
handleDoesNotExist(resourceName);
xdsConfigWatcher.onResourceDoesNotExist(toContextString());
}
}

private class CdsWatcher extends XdsWatcherBase<XdsClusterResource.CdsUpdate> {

Expand All @@ -254,6 +361,12 @@ public void onChanged(XdsClusterResource.CdsUpdate update) {
// TODO: process the update and add an EdsWatcher if needed
// else call maybePublishConfig()
}

@Override
public void onResourceDoesNotExist(String resourceName) {
handleDoesNotExist(resourceName);
}

}

private class EdsWatcher extends XdsWatcherBase<XdsEndpointResource.EdsUpdate> {
Expand All @@ -265,7 +378,11 @@ private EdsWatcher(String resourceName) {
public void onChanged(XdsEndpointResource.EdsUpdate update) {
// TODO: process the update
maybePublishConfig();
}

@Override
public void onResourceDoesNotExist(String resourceName) {
handleDoesNotExist(resourceName);
}
}
}

0 comments on commit 66b0268

Please sign in to comment.