Skip to content

Commit

Permalink
Merge branch 'grpc:master' into master
Browse files Browse the repository at this point in the history
  • Loading branch information
larry-safran authored Feb 1, 2024
2 parents 94366c4 + 3202370 commit abc4f87
Show file tree
Hide file tree
Showing 8 changed files with 68 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,9 @@ public PickResult pickSubchannel(PickSubchannelArgs args) {
WeightedChildLbState wChild = (WeightedChildLbState) childLbState;
PickResult pickResult = childLbState.getCurrentPicker().pickSubchannel(args);
Subchannel subchannel = pickResult.getSubchannel();
if (subchannel == null) {
return pickResult;
}
if (!enableOobLoadReport) {
return PickResult.withSubchannel(subchannel,
OrcaPerRequestUtil.getInstance().newOrcaClientStreamTracerFactory(
Expand Down
24 changes: 14 additions & 10 deletions xds/src/main/java/io/grpc/xds/XdsClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.protobuf.Any;
import io.grpc.ExperimentalApi;
import io.grpc.Status;
import io.grpc.xds.Bootstrapper.ServerInfo;
import io.grpc.xds.LoadStatsManager2.ClusterDropStats;
Expand Down Expand Up @@ -115,15 +116,16 @@ public interface ResourceUpdate {}
/**
* Watcher interface for a single requested xDS resource.
*/
interface ResourceWatcher<T extends ResourceUpdate> {
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/10862")
public interface ResourceWatcher<T extends ResourceUpdate> {

/**
* Called when the resource discovery RPC encounters some transient error.
*
* <p>Note that we expect that the implementer to:
* - Comply with the guarantee to not generate certain statuses by the library:
* https://grpc.github.io/grpc/core/md_doc_statuscodes.html. If the code needs to be
* propagated to the channel, override it with {@link Status.Code#UNAVAILABLE}.
* propagated to the channel, override it with {@link io.grpc.Status.Code#UNAVAILABLE}.
* - Keep {@link Status} description in one form or another, as it contains valuable debugging
* information.
*/
Expand Down Expand Up @@ -306,23 +308,25 @@ TlsContextManager getTlsContextManager() {
/**
* Registers a data watcher for the given Xds resource.
*/
<T extends ResourceUpdate> void watchXdsResource(XdsResourceType<T> type, String resourceName,
ResourceWatcher<T> watcher,
Executor executor) {
public <T extends ResourceUpdate> void watchXdsResource(XdsResourceType<T> type,
String resourceName,
ResourceWatcher<T> watcher,
Executor executor) {
throw new UnsupportedOperationException();
}

<T extends ResourceUpdate> void watchXdsResource(XdsResourceType<T> type, String resourceName,
ResourceWatcher<T> watcher) {
public <T extends ResourceUpdate> void watchXdsResource(XdsResourceType<T> type,
String resourceName,
ResourceWatcher<T> watcher) {
watchXdsResource(type, resourceName, watcher, MoreExecutors.directExecutor());
}

/**
* Unregisters the given resource watcher.
*/
<T extends ResourceUpdate> void cancelXdsResourceWatch(XdsResourceType<T> type,
String resourceName,
ResourceWatcher<T> watcher) {
public <T extends ResourceUpdate> void cancelXdsResourceWatch(XdsResourceType<T> type,
String resourceName,
ResourceWatcher<T> watcher) {
throw new UnsupportedOperationException();
}

Expand Down
13 changes: 7 additions & 6 deletions xds/src/main/java/io/grpc/xds/XdsClientImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -282,9 +282,10 @@ TlsContextManager getTlsContextManager() {
}

@Override
<T extends ResourceUpdate> void watchXdsResource(XdsResourceType<T> type, String resourceName,
ResourceWatcher<T> watcher,
Executor watcherExecutor) {
public <T extends ResourceUpdate> void watchXdsResource(XdsResourceType<T> type,
String resourceName,
ResourceWatcher<T> watcher,
Executor watcherExecutor) {
syncContext.execute(new Runnable() {
@Override
@SuppressWarnings("unchecked")
Expand All @@ -309,9 +310,9 @@ public void run() {
}

@Override
<T extends ResourceUpdate> void cancelXdsResourceWatch(XdsResourceType<T> type,
String resourceName,
ResourceWatcher<T> watcher) {
public <T extends ResourceUpdate> void cancelXdsResourceWatch(XdsResourceType<T> type,
String resourceName,
ResourceWatcher<T> watcher) {
syncContext.execute(new Runnable() {
@Override
@SuppressWarnings("unchecked")
Expand Down
11 changes: 6 additions & 5 deletions xds/src/test/java/io/grpc/xds/CdsLoadBalancer2Test.java
Original file line number Diff line number Diff line change
Expand Up @@ -789,18 +789,19 @@ private final class FakeXdsClient extends XdsClient {

@Override
@SuppressWarnings("unchecked")
<T extends ResourceUpdate> void watchXdsResource(XdsResourceType<T> type, String resourceName,
ResourceWatcher<T> watcher, Executor syncContext) {
public <T extends ResourceUpdate> void watchXdsResource(XdsResourceType<T> type,
String resourceName,
ResourceWatcher<T> watcher, Executor syncContext) {
assertThat(type.typeName()).isEqualTo("CDS");
watchers.computeIfAbsent(resourceName, k -> new ArrayList<>())
.add((ResourceWatcher<CdsUpdate>)watcher);
}

@Override
@SuppressWarnings("unchecked")
<T extends ResourceUpdate> void cancelXdsResourceWatch(XdsResourceType<T> type,
String resourceName,
ResourceWatcher<T> watcher) {
public <T extends ResourceUpdate> void cancelXdsResourceWatch(XdsResourceType<T> type,
String resourceName,
ResourceWatcher<T> watcher) {
assertThat(type.typeName()).isEqualTo("CDS");
assertThat(watchers).containsKey(resourceName);
List<ResourceWatcher<CdsUpdate>> watcherList = watchers.get(resourceName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1184,19 +1184,20 @@ private static final class FakeXdsClient extends XdsClient {

@Override
@SuppressWarnings("unchecked")
<T extends ResourceUpdate> void watchXdsResource(XdsResourceType<T> type, String resourceName,
ResourceWatcher<T> watcher,
Executor syncContext) {
public <T extends ResourceUpdate> void watchXdsResource(XdsResourceType<T> type,
String resourceName,
ResourceWatcher<T> watcher,
Executor syncContext) {
assertThat(type.typeName()).isEqualTo("EDS");
assertThat(watchers).doesNotContainKey(resourceName);
watchers.put(resourceName, (ResourceWatcher<EdsUpdate>) watcher);
}

@Override
@SuppressWarnings("unchecked")
<T extends ResourceUpdate> void cancelXdsResourceWatch(XdsResourceType<T> type,
String resourceName,
ResourceWatcher<T> watcher) {
public <T extends ResourceUpdate> void cancelXdsResourceWatch(XdsResourceType<T> type,
String resourceName,
ResourceWatcher<T> watcher) {
assertThat(type.typeName()).isEqualTo("EDS");
assertThat(watchers).containsKey(resourceName);
watchers.remove(resourceName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import io.grpc.LoadBalancer.Subchannel;
import io.grpc.LoadBalancer.SubchannelPicker;
import io.grpc.LoadBalancer.SubchannelStateListener;
import io.grpc.Status;
import io.grpc.SynchronizationContext;
import io.grpc.internal.FakeClock;
import io.grpc.internal.TestUtils;
Expand Down Expand Up @@ -162,6 +163,22 @@ public ClientCall<OrcaLoadReportRequest, OrcaLoadReport> answer(
verify(helper, times(3)).createSubchannel(any(CreateSubchannelArgs.class));
}

@Test
public void pickChildLbTF() throws Exception {
syncContext.execute(() -> wrr.acceptResolvedAddresses(ResolvedAddresses.newBuilder()
.setAddresses(servers.subList(0, 1)).setLoadBalancingPolicyConfig(weightedConfig)
.setAttributes(affinity).build()));
Iterator<Subchannel> it = subchannels.values().iterator();
Subchannel readySubchannel1 = it.next();
getSubchannelStateListener(readySubchannel1).onSubchannelState(ConnectivityStateInfo
.forTransientFailure(Status.UNAVAILABLE));
verify(helper).updateBalancingState(
eq(ConnectivityState.TRANSIENT_FAILURE), pickerCaptor.capture());
final WeightedRoundRobinPicker weightedPicker =
(WeightedRoundRobinPicker) pickerCaptor.getValue();
weightedPicker.pickSubchannel(mockArgs);
}

@Test
public void wrrLifeCycle() {
syncContext.execute(() -> wrr.acceptResolvedAddresses(ResolvedAddresses.newBuilder()
Expand Down
14 changes: 7 additions & 7 deletions xds/src/test/java/io/grpc/xds/XdsNameResolverTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -1915,10 +1915,10 @@ BootstrapInfo getBootstrapInfo() {

@Override
@SuppressWarnings("unchecked")
<T extends ResourceUpdate> void watchXdsResource(XdsResourceType<T> resourceType,
String resourceName,
ResourceWatcher<T> watcher,
Executor syncContext) {
public <T extends ResourceUpdate> void watchXdsResource(XdsResourceType<T> resourceType,
String resourceName,
ResourceWatcher<T> watcher,
Executor syncContext) {

switch (resourceType.typeName()) {
case "LDS":
Expand All @@ -1939,9 +1939,9 @@ <T extends ResourceUpdate> void watchXdsResource(XdsResourceType<T> resourceType
}

@Override
<T extends ResourceUpdate> void cancelXdsResourceWatch(XdsResourceType<T> type,
String resourceName,
ResourceWatcher<T> watcher) {
public <T extends ResourceUpdate> void cancelXdsResourceWatch(XdsResourceType<T> type,
String resourceName,
ResourceWatcher<T> watcher) {
switch (type.typeName()) {
case "LDS":
assertThat(ldsResource).isNotNull();
Expand Down
14 changes: 7 additions & 7 deletions xds/src/test/java/io/grpc/xds/XdsServerTestHelper.java
Original file line number Diff line number Diff line change
Expand Up @@ -181,10 +181,10 @@ public BootstrapInfo getBootstrapInfo() {

@Override
@SuppressWarnings("unchecked")
<T extends ResourceUpdate> void watchXdsResource(XdsResourceType<T> resourceType,
String resourceName,
ResourceWatcher<T> watcher,
Executor syncContext) {
public <T extends ResourceUpdate> void watchXdsResource(XdsResourceType<T> resourceType,
String resourceName,
ResourceWatcher<T> watcher,
Executor syncContext) {
switch (resourceType.typeName()) {
case "LDS":
assertThat(ldsWatcher).isNull();
Expand All @@ -201,9 +201,9 @@ <T extends ResourceUpdate> void watchXdsResource(XdsResourceType<T> resourceType
}

@Override
<T extends ResourceUpdate> void cancelXdsResourceWatch(XdsResourceType<T> type,
String resourceName,
ResourceWatcher<T> watcher) {
public <T extends ResourceUpdate> void cancelXdsResourceWatch(XdsResourceType<T> type,
String resourceName,
ResourceWatcher<T> watcher) {
switch (type.typeName()) {
case "LDS":
assertThat(ldsWatcher).isNotNull();
Expand Down

0 comments on commit abc4f87

Please sign in to comment.