-
ContextI am using a Multi for an event stream in a Quarkus project. Processing each item is complicated (takes a lot of time, calls several APIs in different services) and may fail. In that case, I would like to abort processing the rest of the items in the stream. I clarified in #697 how to abort processing the item stream, with the additional comment that throwing a RuntimeException to cancel the upstream seems to work only from DescriptionThis works great: public class Test {
private static final Logger LOG = Logger.getLogger(Test.class);
public void multiTest() {
List<String> items = Arrays.asList("one", "two", "three", "four", "five");
Multi<String> stream = Multi.createFrom().iterable(items);
Cancellable cancel = stream
.onItem().transform(item -> {
if (item.equals("two")) {
throw new RuntimeException();
}
LOG.infof("Transformed %s", item);
return item;
})
.onFailure().invoke(() -> {
LOG.info("Failed");
})
.subscribe().with(item -> {
LOG.infof("Handled %s", item);
});
}
} The output is the expected one:
However, if I have to call an async service from the transform callback, which might fail, I do not know how to cancel the item stream. I tried like this: public void multiTest() {
List<String> items = Arrays.asList("one", "two", "three");
Multi<String> stream = Multi.createFrom().iterable(items);
Cancellable cancel = stream
.onItem().transform(item -> {
// Simulate calling an async service
Uni.createFrom().item(1)
.onFailure().invoke(() -> {
LOG.info("Failed deep");
})
.subscribe().with(i -> {
LOG.infof("Deep %s", i);
if(item.equals("two"))
throw new RuntimeException();
});
LOG.infof("Transformed %s", item);
return item;
})
.onFailure().invoke(() -> {
LOG.info("Failed");
})
.subscribe().with(item -> {
LOG.infof("Handled %s", item);
});
} The output I get is:
Not to mention that I have to call a multitude of async services, dependent on each other, until I finish processing each item. How can I propagate the errors from those up to the transform callback, so that I can cancel the item stream? I can probably achieve this with some CompletableFuture on which I wait in the transform callback, but is there a cleaner, Mutiny-like way? For example I tried this too: public void multiTest() {
List<String> items = Arrays.asList("one", "two", "three");
Multi<String> stream = Multi.createFrom().iterable(items);
Cancellable cancel = stream
.onItem().transform(item -> {
CompletableFuture<Boolean> itemReady = new CompletableFuture<>();
// Simulate calling an async service
Uni.createFrom().item(1)
.onFailure().invoke(() -> {
LOG.info("Failed deep");
itemReady.complete(false);
})
.subscribe().with(i -> {
LOG.infof("Deep %s", i);
if(item.equals("two")) {
itemReady.complete(false);
return;
}
itemReady.complete(true);
});
itemReady.thenAccept(ready -> {
if(!ready) {
LOG.infof("Item %s handling had errors", item);
throw new RuntimeException();
}
});
LOG.infof("Transformed %s", item);
return item;
})
.onFailure().invoke(() -> {
LOG.info("Failed");
})
.subscribe().with(item -> {
LOG.infof("Handled %s", item);
});
} Which produces this output:
I realize this is not necessarily a Mutiny problem, by I am new to this reactive approach and would appreciate a push in the right direction. Thank you. |
Beta Was this translation helpful? Give feedback.
Replies: 1 comment
-
Instead of using |
Beta Was this translation helpful? Give feedback.
Instead of using
transform
(which is synchronous) and subscribing from there, you should look attransformToUni
.The returned
Uni
is then either with a value or a failure, depending on what the asynchronous operation it models did.