Skip to content

Commit

Permalink
Allow users to start watching xDS resources (grpc#10864)
Browse files Browse the repository at this point in the history
  • Loading branch information
anicr7 authored Feb 1, 2024
1 parent a97f21b commit 3202370
Show file tree
Hide file tree
Showing 6 changed files with 48 additions and 41 deletions.
24 changes: 14 additions & 10 deletions xds/src/main/java/io/grpc/xds/XdsClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.protobuf.Any;
import io.grpc.ExperimentalApi;
import io.grpc.Status;
import io.grpc.xds.Bootstrapper.ServerInfo;
import io.grpc.xds.LoadStatsManager2.ClusterDropStats;
Expand Down Expand Up @@ -115,15 +116,16 @@ public interface ResourceUpdate {}
/**
* Watcher interface for a single requested xDS resource.
*/
interface ResourceWatcher<T extends ResourceUpdate> {
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/10862")
public interface ResourceWatcher<T extends ResourceUpdate> {

/**
* Called when the resource discovery RPC encounters some transient error.
*
* <p>Note that we expect that the implementer to:
* - Comply with the guarantee to not generate certain statuses by the library:
* https://grpc.github.io/grpc/core/md_doc_statuscodes.html. If the code needs to be
* propagated to the channel, override it with {@link Status.Code#UNAVAILABLE}.
* propagated to the channel, override it with {@link io.grpc.Status.Code#UNAVAILABLE}.
* - Keep {@link Status} description in one form or another, as it contains valuable debugging
* information.
*/
Expand Down Expand Up @@ -306,23 +308,25 @@ TlsContextManager getTlsContextManager() {
/**
* Registers a data watcher for the given Xds resource.
*/
<T extends ResourceUpdate> void watchXdsResource(XdsResourceType<T> type, String resourceName,
ResourceWatcher<T> watcher,
Executor executor) {
public <T extends ResourceUpdate> void watchXdsResource(XdsResourceType<T> type,
String resourceName,
ResourceWatcher<T> watcher,
Executor executor) {
throw new UnsupportedOperationException();
}

<T extends ResourceUpdate> void watchXdsResource(XdsResourceType<T> type, String resourceName,
ResourceWatcher<T> watcher) {
public <T extends ResourceUpdate> void watchXdsResource(XdsResourceType<T> type,
String resourceName,
ResourceWatcher<T> watcher) {
watchXdsResource(type, resourceName, watcher, MoreExecutors.directExecutor());
}

/**
* Unregisters the given resource watcher.
*/
<T extends ResourceUpdate> void cancelXdsResourceWatch(XdsResourceType<T> type,
String resourceName,
ResourceWatcher<T> watcher) {
public <T extends ResourceUpdate> void cancelXdsResourceWatch(XdsResourceType<T> type,
String resourceName,
ResourceWatcher<T> watcher) {
throw new UnsupportedOperationException();
}

Expand Down
13 changes: 7 additions & 6 deletions xds/src/main/java/io/grpc/xds/XdsClientImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -282,9 +282,10 @@ TlsContextManager getTlsContextManager() {
}

@Override
<T extends ResourceUpdate> void watchXdsResource(XdsResourceType<T> type, String resourceName,
ResourceWatcher<T> watcher,
Executor watcherExecutor) {
public <T extends ResourceUpdate> void watchXdsResource(XdsResourceType<T> type,
String resourceName,
ResourceWatcher<T> watcher,
Executor watcherExecutor) {
syncContext.execute(new Runnable() {
@Override
@SuppressWarnings("unchecked")
Expand All @@ -309,9 +310,9 @@ public void run() {
}

@Override
<T extends ResourceUpdate> void cancelXdsResourceWatch(XdsResourceType<T> type,
String resourceName,
ResourceWatcher<T> watcher) {
public <T extends ResourceUpdate> void cancelXdsResourceWatch(XdsResourceType<T> type,
String resourceName,
ResourceWatcher<T> watcher) {
syncContext.execute(new Runnable() {
@Override
@SuppressWarnings("unchecked")
Expand Down
11 changes: 6 additions & 5 deletions xds/src/test/java/io/grpc/xds/CdsLoadBalancer2Test.java
Original file line number Diff line number Diff line change
Expand Up @@ -789,18 +789,19 @@ private final class FakeXdsClient extends XdsClient {

@Override
@SuppressWarnings("unchecked")
<T extends ResourceUpdate> void watchXdsResource(XdsResourceType<T> type, String resourceName,
ResourceWatcher<T> watcher, Executor syncContext) {
public <T extends ResourceUpdate> void watchXdsResource(XdsResourceType<T> type,
String resourceName,
ResourceWatcher<T> watcher, Executor syncContext) {
assertThat(type.typeName()).isEqualTo("CDS");
watchers.computeIfAbsent(resourceName, k -> new ArrayList<>())
.add((ResourceWatcher<CdsUpdate>)watcher);
}

@Override
@SuppressWarnings("unchecked")
<T extends ResourceUpdate> void cancelXdsResourceWatch(XdsResourceType<T> type,
String resourceName,
ResourceWatcher<T> watcher) {
public <T extends ResourceUpdate> void cancelXdsResourceWatch(XdsResourceType<T> type,
String resourceName,
ResourceWatcher<T> watcher) {
assertThat(type.typeName()).isEqualTo("CDS");
assertThat(watchers).containsKey(resourceName);
List<ResourceWatcher<CdsUpdate>> watcherList = watchers.get(resourceName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1184,19 +1184,20 @@ private static final class FakeXdsClient extends XdsClient {

@Override
@SuppressWarnings("unchecked")
<T extends ResourceUpdate> void watchXdsResource(XdsResourceType<T> type, String resourceName,
ResourceWatcher<T> watcher,
Executor syncContext) {
public <T extends ResourceUpdate> void watchXdsResource(XdsResourceType<T> type,
String resourceName,
ResourceWatcher<T> watcher,
Executor syncContext) {
assertThat(type.typeName()).isEqualTo("EDS");
assertThat(watchers).doesNotContainKey(resourceName);
watchers.put(resourceName, (ResourceWatcher<EdsUpdate>) watcher);
}

@Override
@SuppressWarnings("unchecked")
<T extends ResourceUpdate> void cancelXdsResourceWatch(XdsResourceType<T> type,
String resourceName,
ResourceWatcher<T> watcher) {
public <T extends ResourceUpdate> void cancelXdsResourceWatch(XdsResourceType<T> type,
String resourceName,
ResourceWatcher<T> watcher) {
assertThat(type.typeName()).isEqualTo("EDS");
assertThat(watchers).containsKey(resourceName);
watchers.remove(resourceName);
Expand Down
14 changes: 7 additions & 7 deletions xds/src/test/java/io/grpc/xds/XdsNameResolverTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -1915,10 +1915,10 @@ BootstrapInfo getBootstrapInfo() {

@Override
@SuppressWarnings("unchecked")
<T extends ResourceUpdate> void watchXdsResource(XdsResourceType<T> resourceType,
String resourceName,
ResourceWatcher<T> watcher,
Executor syncContext) {
public <T extends ResourceUpdate> void watchXdsResource(XdsResourceType<T> resourceType,
String resourceName,
ResourceWatcher<T> watcher,
Executor syncContext) {

switch (resourceType.typeName()) {
case "LDS":
Expand All @@ -1939,9 +1939,9 @@ <T extends ResourceUpdate> void watchXdsResource(XdsResourceType<T> resourceType
}

@Override
<T extends ResourceUpdate> void cancelXdsResourceWatch(XdsResourceType<T> type,
String resourceName,
ResourceWatcher<T> watcher) {
public <T extends ResourceUpdate> void cancelXdsResourceWatch(XdsResourceType<T> type,
String resourceName,
ResourceWatcher<T> watcher) {
switch (type.typeName()) {
case "LDS":
assertThat(ldsResource).isNotNull();
Expand Down
14 changes: 7 additions & 7 deletions xds/src/test/java/io/grpc/xds/XdsServerTestHelper.java
Original file line number Diff line number Diff line change
Expand Up @@ -181,10 +181,10 @@ public BootstrapInfo getBootstrapInfo() {

@Override
@SuppressWarnings("unchecked")
<T extends ResourceUpdate> void watchXdsResource(XdsResourceType<T> resourceType,
String resourceName,
ResourceWatcher<T> watcher,
Executor syncContext) {
public <T extends ResourceUpdate> void watchXdsResource(XdsResourceType<T> resourceType,
String resourceName,
ResourceWatcher<T> watcher,
Executor syncContext) {
switch (resourceType.typeName()) {
case "LDS":
assertThat(ldsWatcher).isNull();
Expand All @@ -201,9 +201,9 @@ <T extends ResourceUpdate> void watchXdsResource(XdsResourceType<T> resourceType
}

@Override
<T extends ResourceUpdate> void cancelXdsResourceWatch(XdsResourceType<T> type,
String resourceName,
ResourceWatcher<T> watcher) {
public <T extends ResourceUpdate> void cancelXdsResourceWatch(XdsResourceType<T> type,
String resourceName,
ResourceWatcher<T> watcher) {
switch (type.typeName()) {
case "LDS":
assertThat(ldsWatcher).isNotNull();
Expand Down

0 comments on commit 3202370

Please sign in to comment.