-
I have a question on how to handle this scenario from a reactive RESTful endpoint. Usually I can kludge my way through but this one is stumping me. What I am trying to do is:
I know this code is completely wrong (there are all kinds of things I was trying to do and now it is just in a complete state of chaos) but it sort of demonstrates what I would like to do: public Uni<List<InstanceInfo>> getStatus(@BeanParam final ClusterParams params) {
return tenantClient.getClient(params)
.chain(c -> this.clusterService.getInstances(params.getInstanceId(), c)
.map(i -> clusterMapper.mapInstanceResponse(i))
.invoke(r -> {
this.getDeploymentsByTag(r.getTag(), r.getClient()).forEach(d -> {
final Deployment deployment = d.get();
final boolean ready = Readiness.isDeploymentReady(deployment);
final InstanceInfo instance = new InstanceInfo();
instance.setDomain(r.getDomain());
instance.setName(deployment.getMetadata().getName());
instance.setReady(ready);
r.getInstances().add(instance);
});
})
.runSubscriptionOn(Infrastructure.getDefaultWorkerPool())
.onItem().transformToMultiAndMerge(i -> Multi.createFrom().iterable(i.getInstances()))
.collect().asList());
} Stream<RollableScalableResource<Deployment>> getDeploymentsByTag(final String tag, final String client) {
return kubernetesClient.apps().deployments()
.inNamespace(client)
.withLabel(TAG_LABEL, tag)
.resources();
} This is the test I am writing against it if it helps give more context. @Test
public void should_get_status() {
clusterInstance.setTag("my-tag-1");
List<InstanceInfo> actual = given()
.pathParam("client", "my-client")
.pathParam("instance", "my-tag-1")
.contentType(ContentType.JSON)
.when()
.get("/{client}/instances/{instance}/status")
.then()
.statusCode(200)
.extract().body()
.jsonPath().getList(".", InstanceInfo.class);
assertNotNull(actual);
} As a final note, I am sure I am doing this wrong (even when I had it working) because I get a blocked thread when running natively in Quarkus where locally my test does not throw any issues. |
Beta Was this translation helpful? Give feedback.
Replies: 3 comments
-
Adding more context, I am trying to rewrite this original method someone else wrote. It worked but didnt look right (and it only handled a single item not multiple): @GET
@Blocking
@Path("/{client}/instances/{instance}/status")
@Produces(APPLICATION_JSON)
public Uni<InstanceResponse> getStatus(@BeanParam final ClusterParams params) {
// Can't do the kubectlClient lookup inside chained lambda call. It's fine
// outside of it though
final List<Deployment> deployments = kubernetesClient.apps().deployments()
.inNamespace(params.getClient()).list().getItems();
log.debug("Found {} deployments in {}", deployments.size(), params.getClient());
return tenantClient.getClient(params)
.chain(c -> this.clusterService.getInstance(params.getInstanceId(), c))
.chain(i -> {
checkDeploymentStatus(deployments, i);
return Uni.createFrom().item(i);
})
.onFailure().retry().atMost(STATUS_MAX_TRIES)
.map(i -> this.clusterMapper.mapInstanceResponse(i));
} |
Beta Was this translation helpful? Give feedback.
-
This is my first stab at it. It works from the test case but when deployed/native I get a thread blocked on what appears to be the "list" from the Kubernetes client. So i am confused how to handle the call to the Kubernetes client "list" of the deployments "reactively" @GET
@Path("/{client}/instances/{instance}/status")
@Produces(APPLICATION_JSON)
public Uni<List<InstanceInfo>> getStatus(@BeanParam final ClusterParams params) {
return tenantClient.getClient(params)
.chain(c -> this.clusterService.getInstances(params.getInstanceId(), c)
.map(i -> this.clusterMapper.mapInstanceResponse(i)).toUni()
.chain(r -> Uni.createFrom().item(this.getDeploymentsByTag(r.getTag(), r.getClient())))
.onItem().transformToMulti(d -> Multi.createFrom().iterable(d.getItems()))
.map(d -> {
final boolean ready = Readiness.isDeploymentReady(d);
final InstanceInfo instance = new InstanceInfo();
instance.setDomain(c.getUri());
instance.setName(d.getMetadata().getName());
instance.setReady(ready);
return instance;
})
//.runSubscriptionOn(Infrastructure.getDefaultWorkerPool())
.collect().asList());
} DeploymentList getDeploymentsByTag(final String tag, final String client) {
return kubernetesClient.apps().deployments()
.inNamespace(client)
.withLabel(TAG_LABEL, tag)
.list();
} I seem to have two issues. Without the
Looking at the way Stork/Kubernetes discovery does it - would this be the right approach with the emitter? |
Beta Was this translation helpful? Give feedback.
-
Here is my latest pass and it seems to work! Is this the right way or is there a simpler way to do this? I could not get it without injecting Vertx (an i literally clipped it from the Stork/Kubernetes Discovery code). @GET
@Path("/{client}/instances/{instance}/status")
@Produces(APPLICATION_JSON)
public Uni<List<InstanceInfo>> getStatus(@BeanParam final ClusterParams params) {
return tenantClient.getClient(params)
.chain(c -> this.clusterService.getInstances(params.getInstanceId(), c)
.map(i -> this.clusterMapper.mapInstanceResponse(i)).toUni()
.chain(r -> getDeployments(r.getTag(), r.getClient()))
.onItem().transformToMulti(d -> Multi.createFrom().iterable(d.getItems()))
.map(d -> {
final boolean ready = Readiness.isDeploymentReady(d);
final InstanceInfo instance = new InstanceInfo();
instance.setDomain(c.getUri());
instance.setName(d.getMetadata().getName());
instance.setReady(ready);
log.debug("Returning {} instance info", instance.getName());
return instance;
})
.collect().asList());
} private Uni<DeploymentList> getDeployments(final String tag, final String client) {
return Uni.createFrom().emitter(emitter -> {
vertx.executeBlocking(future -> {
final DeploymentList list = kubernetesClient.apps().deployments()
.inNamespace(client)
.withLabel(TAG_LABEL, tag)
.list();
future.complete(list);
}, result -> {
if (result.succeeded()) {
final DeploymentList list = (DeploymentList)result.result();
emitter.complete(list);
}
else {
emitter.fail(result.cause());
}
});
});
} |
Beta Was this translation helpful? Give feedback.
Here is my latest pass and it seems to work! Is this the right way or is there a simpler way to do this?
I could not get it without injecting Vertx (an i literally clipped it from the Stork/Kubernetes Discovery code).