Skip to content

Commit

Permalink
Handle Logical DNS clusters
Browse files Browse the repository at this point in the history
  • Loading branch information
larry-safran committed Feb 27, 2025
1 parent 86ea936 commit d72d674
Show file tree
Hide file tree
Showing 8 changed files with 559 additions and 375 deletions.
245 changes: 8 additions & 237 deletions xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,6 @@
import io.grpc.Status;
import io.grpc.StatusOr;
import io.grpc.SynchronizationContext;
import io.grpc.internal.BackoffPolicy;
import io.grpc.internal.ExponentialBackoffPolicy;
import io.grpc.util.ForwardingLoadBalancerHelper;
import io.grpc.util.GracefulSwitchLoadBalancer;
import io.grpc.util.OutlierDetectionLoadBalancer;
import io.grpc.util.OutlierDetectionLoadBalancer.OutlierDetectionLoadBalancerConfig;
Expand All @@ -52,8 +49,6 @@
import io.grpc.xds.client.Locality;
import io.grpc.xds.client.XdsLogger;
import io.grpc.xds.client.XdsLogger.XdsLogLevel;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
Expand All @@ -69,7 +64,6 @@
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;

/**
Expand All @@ -84,49 +78,20 @@ final class CdsLoadBalancer2 extends LoadBalancer {
private final LoadBalancerRegistry lbRegistry;
private CdsLbState rootCdsLbState;
private ResolvedAddresses resolvedAddresses;
private final BackoffPolicy.Provider backoffPolicyProvider;

CdsLoadBalancer2(Helper helper) {
this(helper, LoadBalancerRegistry.getDefaultRegistry(),
new ExponentialBackoffPolicy.Provider());
this(helper, LoadBalancerRegistry.getDefaultRegistry());
}

@VisibleForTesting
CdsLoadBalancer2(Helper helper, LoadBalancerRegistry lbRegistry,
BackoffPolicy.Provider backoffPolicyProvider) {
CdsLoadBalancer2(Helper helper, LoadBalancerRegistry lbRegistry) {
this.helper = checkNotNull(helper, "helper");
this.syncContext = checkNotNull(helper.getSynchronizationContext(), "syncContext");
this.lbRegistry = checkNotNull(lbRegistry, "lbRegistry");
this.backoffPolicyProvider = checkNotNull(backoffPolicyProvider, "backoffPolicyProvider");
logger = XdsLogger.withLogId(InternalLogId.allocate("cds-lb", helper.getAuthority()));
logger.log(XdsLogLevel.INFO, "Created");
}

/**
* Generates the config to be used in the priority LB policy for the single priority of
* logical DNS cluster.
*
* <p>priority LB -> cluster_impl LB (single hardcoded priority) -> pick_first
*/
static PriorityChildConfig generateDnsBasedPriorityChildConfig(
String cluster, @Nullable Bootstrapper.ServerInfo lrsServerInfo,
@Nullable Long maxConcurrentRequests,
@Nullable EnvoyServerProtoData.UpstreamTlsContext tlsContext,
Map<String, Struct> filterMetadata,
LoadBalancerRegistry lbRegistry, List<Endpoints.DropOverload> dropOverloads) {
// Override endpoint-level LB policy with pick_first for logical DNS cluster.
Object endpointLbConfig = GracefulSwitchLoadBalancer.createLoadBalancingPolicyConfig(
lbRegistry.getProvider("pick_first"), null);
ClusterImplLoadBalancerProvider.ClusterImplConfig clusterImplConfig =
new ClusterImplLoadBalancerProvider.ClusterImplConfig(cluster, null, lrsServerInfo,
maxConcurrentRequests, dropOverloads, endpointLbConfig, tlsContext, filterMetadata);
LoadBalancerProvider clusterImplLbProvider =
lbRegistry.getProvider(XdsLbPolicies.CLUSTER_IMPL_POLICY_NAME);
Object clusterImplPolicy = GracefulSwitchLoadBalancer.createLoadBalancingPolicyConfig(
clusterImplLbProvider, clusterImplConfig);
return new PriorityChildConfig(clusterImplPolicy, false /* ignoreReresolution*/);
}

/**
* Generates configs to be used in the priority LB policy for priorities in an EDS cluster.
*
Expand Down Expand Up @@ -323,7 +288,7 @@ private final class ClusterResolverLbState extends LoadBalancer {


ClusterResolverLbState(Helper helper) {
this.helper = new RefreshableHelper(checkNotNull(helper, "helper"));
this.helper = checkNotNull(helper, "helper");
logger.log(XdsLogLevel.DEBUG, "New ClusterResolverLbState");
}

Expand All @@ -334,18 +299,13 @@ public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) {
(ClusterResolverConfig) resolvedAddresses.getLoadBalancingPolicyConfig();
endpointLbConfig = config.lbConfig;
for (DiscoveryMechanism instance : config.discoveryMechanisms) {
assert instance.type == DiscoveryMechanism.Type.EDS;
clusters.add(instance.cluster);
ClusterState state;
if (instance.type == DiscoveryMechanism.Type.EDS) {
state = new EdsClusterState(instance.cluster, instance.edsServiceName,
instance.endpointConfig,
instance.lrsServerInfo, instance.maxConcurrentRequests, instance.tlsContext,
instance.filterMetadata, instance.outlierDetection);
} else { // logical DNS
state = new LogicalDnsClusterState(instance.cluster, instance.dnsHostName,
instance.lrsServerInfo, instance.maxConcurrentRequests, instance.tlsContext,
instance.filterMetadata);
}
state = new EdsClusterState(instance.cluster, instance.edsServiceName,
instance.endpointConfig,
instance.lrsServerInfo, instance.maxConcurrentRequests, instance.tlsContext,
instance.filterMetadata, instance.outlierDetection);
clusterStates.put(instance.cluster, state);
state.start();
}
Expand Down Expand Up @@ -444,31 +404,6 @@ private void handleEndpointResolutionError() {
}
}

/**
* Wires re-resolution requests from downstream LB policies with DNS resolver.
*/
private final class RefreshableHelper extends ForwardingLoadBalancerHelper {
private final Helper delegate;

private RefreshableHelper(Helper delegate) {
this.delegate = checkNotNull(delegate, "delegate");
}

@Override
public void refreshNameResolution() {
for (ClusterState state : clusterStates.values()) {
if (state instanceof LogicalDnsClusterState) {
((LogicalDnsClusterState) state).refresh();
}
}
}

@Override
protected Helper delegate() {
return delegate;
}
}

/**
* Resolution state of an underlying cluster.
*/
Expand Down Expand Up @@ -676,170 +611,6 @@ void onError(final Status error) {
}
}

private final class LogicalDnsClusterState extends ClusterState {
private final String dnsHostName;
private final NameResolver.Factory nameResolverFactory;
private final NameResolver.Args nameResolverArgs;
private NameResolver resolver;
@Nullable
private BackoffPolicy backoffPolicy;
@Nullable
private SynchronizationContext.ScheduledHandle scheduledRefresh;

private LogicalDnsClusterState(String name, String dnsHostName,
@Nullable Bootstrapper.ServerInfo lrsServerInfo,
@Nullable Long maxConcurrentRequests,
@Nullable EnvoyServerProtoData.UpstreamTlsContext tlsContext,
Map<String, Struct> filterMetadata) {
super(name, lrsServerInfo, maxConcurrentRequests, tlsContext, filterMetadata, null);
this.dnsHostName = checkNotNull(dnsHostName, "dnsHostName");
nameResolverFactory =
checkNotNull(helper.getNameResolverRegistry().asFactory(), "nameResolverFactory");
nameResolverArgs = checkNotNull(helper.getNameResolverArgs(), "nameResolverArgs");
}

@Override
void start() {
URI uri;
try {
uri = new URI("dns", "", "/" + dnsHostName, null);
} catch (URISyntaxException e) {
status = Status.INTERNAL.withDescription(
"Bug, invalid URI creation: " + dnsHostName).withCause(e);
handleEndpointResolutionError();
return;
}
resolver = nameResolverFactory.newNameResolver(uri, nameResolverArgs);
if (resolver == null) {
status = Status.INTERNAL.withDescription("Xds cluster resolver lb for logical DNS "
+ "cluster [" + name + "] cannot find DNS resolver with uri:" + uri);
handleEndpointResolutionError();
return;
}
resolver.start(new LogicalDnsClusterState.NameResolverListener(dnsHostName));
}

void refresh() {
if (resolver == null) {
return;
}
cancelBackoff();
resolver.refresh();
}

@Override
void shutdown() {
super.shutdown();
if (resolver != null) {
resolver.shutdown();
}
cancelBackoff();
}

private void cancelBackoff() {
if (scheduledRefresh != null) {
scheduledRefresh.cancel();
scheduledRefresh = null;
backoffPolicy = null;
}
}

private class DelayedNameResolverRefresh implements Runnable {
@Override
public void run() {
scheduledRefresh = null;
if (!shutdown) {
resolver.refresh();
}
}
}

private class NameResolverListener extends NameResolver.Listener2 {
private final String dnsHostName;

NameResolverListener(String dnsHostName) {
this.dnsHostName = dnsHostName;
}

@Override
public void onResult(final NameResolver.ResolutionResult resolutionResult) {
class NameResolved implements Runnable {
@Override
public void run() {
if (shutdown) {
return;
}
backoffPolicy = null; // reset backoff sequence if succeeded
// Arbitrary priority notation for all DNS-resolved endpoints.
String priorityName = priorityName(name, 0); // value doesn't matter
List<EquivalentAddressGroup> addresses = new ArrayList<>();
for (EquivalentAddressGroup eag : resolutionResult.getAddresses()) {
// No weight attribute is attached, all endpoint-level LB policy should be able
// to handle such it.
String localityName = localityName(XdsNameResolver.LOGICAL_DNS_CLUSTER_LOCALITY);
Attributes attr = eag.getAttributes().toBuilder()
.set(XdsAttributes.ATTR_LOCALITY, XdsNameResolver.LOGICAL_DNS_CLUSTER_LOCALITY)
.set(XdsAttributes.ATTR_LOCALITY_NAME, localityName)
.set(XdsAttributes.ATTR_ADDRESS_NAME, dnsHostName)
.build();
eag = new EquivalentAddressGroup(eag.getAddresses(), attr);
eag = AddressFilter.setPathFilter(eag, Arrays.asList(priorityName, localityName));
addresses.add(eag);
}
PriorityChildConfig priorityChildConfig =
generateDnsBasedPriorityChildConfig(
name, lrsServerInfo, maxConcurrentRequests, tlsContext, filterMetadata,
lbRegistry, Collections.<Endpoints.DropOverload>emptyList());
status = Status.OK;
resolved = true;
result = new ClusterResolutionResult(addresses, priorityName, priorityChildConfig);
handleEndpointResourceUpdate();
}
}

syncContext.execute(new NameResolved());
}

@Override
public void onError(final Status error) {
syncContext.execute(new Runnable() {
@Override
public void run() {
if (shutdown) {
return;
}
status = error;
// NameResolver.Listener API cannot distinguish between address-not-found and
// transient errors. If the error occurs in the first resolution, treat it as
// address not found. Otherwise, either there is previously resolved addresses
// previously encountered error, propagate the error to downstream/upstream and
// let downstream/upstream handle it.
if (!resolved) {
resolved = true;
handleEndpointResourceUpdate();
} else {
handleEndpointResolutionError();
}
if (scheduledRefresh != null && scheduledRefresh.isPending()) {
return;
}
if (backoffPolicy == null) {
backoffPolicy = backoffPolicyProvider.get();
}
long delayNanos = backoffPolicy.nextBackoffNanos();
logger.log(XdsLogLevel.DEBUG,
"Logical DNS resolver for cluster {0} encountered name resolution "
+ "error: {1}, scheduling DNS resolution backoff for {2} ns",
name, error, delayNanos);
scheduledRefresh =
syncContext.schedule(
new LogicalDnsClusterState.DelayedNameResolverRefresh(), delayNanos,
TimeUnit.NANOSECONDS, helper.getScheduledExecutorService());
}
});
}
}
}
}

static class ClusterResolutionResult {
Expand Down
Loading

0 comments on commit d72d674

Please sign in to comment.