Skip to content

Commit

Permalink
Support aggregate clusters correctly
Browse files Browse the repository at this point in the history
  • Loading branch information
larry-safran committed Jan 7, 2025
1 parent 2ba1df7 commit b0635a6
Show file tree
Hide file tree
Showing 4 changed files with 110 additions and 7 deletions.
15 changes: 14 additions & 1 deletion xds/src/main/java/io/grpc/xds/XdsConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,8 @@ public static class XdsClusterConfig {

@Override
public int hashCode() {
return clusterName.hashCode() + clusterResource.hashCode() + endpoint.hashCode();
int endpointHash = (endpoint != null) ? endpoint.hashCode() : 0;
return clusterName.hashCode() + clusterResource.hashCode() + endpointHash;
}

@Override
Expand All @@ -107,6 +108,18 @@ public String toString() {
.append(", endpoint=").append(endpoint).append("}");
return builder.toString();
}

public String getClusterName() {
return clusterName;
}

public CdsUpdate getClusterResource() {
return clusterResource;
}

public StatusOr<EdsUpdate> getEndpoint() {
return endpoint;
}
}

static class XdsConfigBuilder {
Expand Down
14 changes: 9 additions & 5 deletions xds/src/main/java/io/grpc/xds/XdsDependencyManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -247,9 +247,11 @@ private void buildConfig() {
XdsConfig.XdsClusterConfig clusterConfig;
String edsName = cdsUpdate.getValue().edsServiceName();
EdsWatcher edsWatcher = (EdsWatcher) edsWatchers.get(edsName);
assert edsWatcher != null;
clusterConfig = new XdsConfig.XdsClusterConfig(clusterName, cdsUpdate.getValue(),
edsWatcher.getData());

// Only EDS type clusters have endpoint data
StatusOr<XdsEndpointResource.EdsUpdate> data =
edsWatcher != null ? edsWatcher.getData() : null;
clusterConfig = new XdsConfig.XdsClusterConfig(clusterName, cdsUpdate.getValue(), data);
builder.addCluster(clusterName, StatusOr.fromValue(clusterConfig));
} else {
builder.addCluster(clusterName, StatusOr.fromStatus(cdsUpdate.getStatus()));
Expand Down Expand Up @@ -472,7 +474,7 @@ public void onChanged(XdsClusterResource.CdsUpdate update) {
// no eds needed
break;
case AGGREGATE:
if (data.hasValue()) {
if (data != null && data.hasValue()) {
Set<String> oldNames = new HashSet<>(data.getValue().prioritizedClusterNames());
Set<String> newNames = new HashSet<>(update.prioritizedClusterNames());

Expand All @@ -488,7 +490,9 @@ public void onChanged(XdsClusterResource.CdsUpdate update) {
}
} else {
setData(update);
update.prioritizedClusterNames().forEach(name -> addWatcher(new CdsWatcher(name)));
for (String name : update.prioritizedClusterNames()) {
addWatcher(new CdsWatcher(name));
}
}
break;
default:
Expand Down
49 changes: 49 additions & 0 deletions xds/src/test/java/io/grpc/xds/XdsDependencyManagerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,24 @@
package io.grpc.xds;

import static com.google.common.truth.Truth.assertThat;
import static io.grpc.xds.XdsClusterResource.CdsUpdate.ClusterType.AGGREGATE;
import static io.grpc.xds.XdsClusterResource.CdsUpdate.ClusterType.EDS;
import static io.grpc.xds.XdsTestControlPlaneService.ADS_TYPE_URL_RDS;
import static io.grpc.xds.XdsTestUtils.getEdsNameForCluster;
import static io.grpc.xds.client.CommonBootstrapperTestUtils.SERVER_URI;
import static org.mockito.AdditionalAnswers.delegatesTo;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.verify;

import com.google.common.collect.ImmutableMap;
import io.envoyproxy.envoy.config.route.v3.RouteConfiguration;
import io.grpc.BindableService;
import io.grpc.ManagedChannel;
import io.grpc.Server;
import io.grpc.Status;
import io.grpc.StatusOr;
import io.grpc.SynchronizationContext;
import io.grpc.inprocess.InProcessChannelBuilder;
import io.grpc.inprocess.InProcessServerBuilder;
Expand All @@ -41,6 +49,7 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -157,6 +166,46 @@ public void verify_config_update() {
assertThat(testWatcher.lastConfig).isNotEqualTo(defaultXdsConfig);
}

@Test
public void verify_simple_aggregate() {
InOrder inOrder = org.mockito.Mockito.inOrder(xdsConfigWatcher);
xdsDependencyManager = new XdsDependencyManager(
xdsClient, xdsConfigWatcher, syncContext, serverName, serverName);
inOrder.verify(xdsConfigWatcher, timeout(1000)).onUpdate(defaultXdsConfig);

List<String> childNames = Arrays.asList("clusterC", "clusterB", "clusterA");
String rootName = "root_c";

RouteConfiguration routeConfig =
XdsTestUtils.buildRouteConfiguration(serverName, XdsTestUtils.RDS_NAME, rootName);
controlPlaneService.setXdsConfig(
ADS_TYPE_URL_RDS, ImmutableMap.of(XdsTestUtils.RDS_NAME, routeConfig));

XdsTestUtils.setAggregateCdsConfig(controlPlaneService, serverName, rootName, childNames);
inOrder.verify(xdsConfigWatcher, timeout(1000)).onUpdate(any());

Map<String, StatusOr<XdsConfig.XdsClusterConfig>> lastConfigClusters =
testWatcher.lastConfig.clusters;
assertThat(lastConfigClusters).hasSize(childNames.size() + 1);
StatusOr<XdsConfig.XdsClusterConfig> rootC = lastConfigClusters.get(rootName);
XdsClusterResource.CdsUpdate rootUpdate = rootC.getValue().clusterResource;
assertThat(rootUpdate.clusterType()).isEqualTo(AGGREGATE);
assertThat(rootUpdate.prioritizedClusterNames()).isEqualTo(childNames);

for (String childName : childNames) {
assertThat(lastConfigClusters).containsKey(childName);
XdsClusterResource.CdsUpdate childResource =
lastConfigClusters.get(childName).getValue().clusterResource;
assertThat(childResource.clusterType()).isEqualTo(EDS);
assertThat(childResource.edsServiceName()).isEqualTo(getEdsNameForCluster(childName));

StatusOr<XdsEndpointResource.EdsUpdate> endpoint =
lastConfigClusters.get(childName).getValue().getEndpoint();
assertThat(endpoint.hasValue()).isTrue();
assertThat(endpoint.getValue().clusterName).isEqualTo(getEdsNameForCluster(childName));
}
}

private static class TestWatcher implements XdsDependencyManager.XdsConfigWatcher {
XdsConfig lastConfig;
int numUpdates = 0;
Expand Down
39 changes: 38 additions & 1 deletion xds/src/test/java/io/grpc/xds/XdsTestUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.protobuf.Any;
import com.google.protobuf.BoolValue;
import com.google.protobuf.Message;
import com.google.protobuf.util.Durations;
Expand All @@ -40,6 +41,7 @@
import io.envoyproxy.envoy.config.route.v3.RouteAction;
import io.envoyproxy.envoy.config.route.v3.RouteConfiguration;
import io.envoyproxy.envoy.config.route.v3.RouteMatch;
import io.envoyproxy.envoy.extensions.clusters.aggregate.v3.ClusterConfig;
import io.envoyproxy.envoy.service.load_stats.v3.LoadReportingServiceGrpc;
import io.envoyproxy.envoy.service.load_stats.v3.LoadStatsRequest;
import io.envoyproxy.envoy.service.load_stats.v3.LoadStatsResponse;
Expand Down Expand Up @@ -69,7 +71,7 @@

public class XdsTestUtils {
private static final Logger log = Logger.getLogger(XdsTestUtils.class.getName());
private static final String RDS_NAME = "route-config.googleapis.com";
static final String RDS_NAME = "route-config.googleapis.com";
private static final String CLUSTER_NAME = "cluster0";
private static final String EDS_NAME = "eds-service-0";
private static final String SERVER_LISTENER = "grpc/server?udpa.resource.listening_address=";
Expand Down Expand Up @@ -148,6 +150,41 @@ static void setAdsConfig(XdsTestControlPlaneService service, String serverName,

}

static String getEdsNameForCluster(String clusterName) {
return "eds_" + clusterName;
}

static void setAggregateCdsConfig(XdsTestControlPlaneService service, String serverName,
String clusterName, List<String> children) {
Map<String, Message> clusterMap = new HashMap<>();

ClusterConfig rootConfig = ClusterConfig.newBuilder().addAllClusters(children).build();
Cluster.CustomClusterType type =
Cluster.CustomClusterType.newBuilder()
.setName(XdsClusterResource.AGGREGATE_CLUSTER_TYPE_NAME)
.setTypedConfig(Any.pack(rootConfig))
.build();
Cluster.Builder builder = Cluster.newBuilder().setName(clusterName).setClusterType(type);
builder.setLbPolicy(Cluster.LbPolicy.ROUND_ROBIN);
Cluster cluster = builder.build();
clusterMap.put(clusterName, cluster);

for (String child : children) {
Cluster childCluster = ControlPlaneRule.buildCluster(child, getEdsNameForCluster(child));
clusterMap.put(child, childCluster);
}

service.setXdsConfig(ADS_TYPE_URL_CDS, clusterMap);

Map<String, Message> edsMap = new HashMap<>();
for (String child : children) {
ClusterLoadAssignment clusterLoadAssignment = ControlPlaneRule.buildClusterLoadAssignment(
serverName, ENDPOINT_HOSTNAME, ENDPOINT_PORT, getEdsNameForCluster(child));
edsMap.put(getEdsNameForCluster(child), clusterLoadAssignment);
}
service.setXdsConfig(ADS_TYPE_URL_EDS, edsMap);
}

static XdsConfig getDefaultXdsConfig(String serverHostName)
throws XdsResourceType.ResourceInvalidException, IOException {
XdsConfig.XdsConfigBuilder builder = new XdsConfig.XdsConfigBuilder();
Expand Down

0 comments on commit b0635a6

Please sign in to comment.