Cannot cancel Multi item stream #697
-
ContextI am using a Multi for an event stream in a Quarkus project. Processing each item is complicated (takes a lot of time, calls several APIs in different services) and may fail. In that case, I would like to abort processing the rest of the items in the stream. No matter what I do, I cannot seem to abort on a failed item. DescriptionMy approach: public class Test {
private static final Logger LOG = Logger.getLogger(Test.class);
public void multiTest() {
List<String> items = Arrays.asList("one", "two", "three", "four", "five", "six", "seven", "eight", "nine");
Multi<String> stream = Multi.createFrom().iterable(items);
Cancellable cancel = stream
.onItem().transform(item -> {
if (item.equals("two")) {
//cancel.cancel();
return Multi.createFrom().failure(new Exception(item));
}
return item;
})
.onCompletion().invoke(() -> {
LOG.info("Completed");
})
.onFailure().invoke(() -> {
LOG.info("Failed");
})
.onCancellation().invoke(() -> {
LOG.info("Cancelled");
})
.subscribe().with(item -> {
// Trace items
if (item.getClass().equals(String.class)) {
String s = (String) item;
LOG.infof("Handled %s", s);
} else
LOG.infof("Stop item in stream");
});
}
} The output is always this:
Q1: Shouldn't this stop after the second item in the stream? How can I achieve that?
Additional detailsI tried using both Exception and RuntimeException in the "stop" item, no cigar. The idea to process items in the transform callback, so that I have a chance to replace an item with a "stop" item, came from this issue. The documentation is not helpful, I started reading closed issues on GitHub out of desperation. The commented |
Beta Was this translation helpful? Give feedback.
Replies: 3 comments 1 reply
-
I'm wondering how your example is compiling. |
Beta Was this translation helpful? Give feedback.
-
Thanks, this works. I explained above how I came to this attempt. Basically because the documentation is not helpful, so I guessed. My initial try was to throw an Exception, I saw that that does not compile. I had no idea that throwing a RuntimeException would work. |
Beta Was this translation helpful? Give feedback.
-
Can I throw a RuntimeException from any handler to cancel (the up)stream? |
Beta Was this translation helpful? Give feedback.
I'm wondering how your example is compiling.
Instead of
return Multi.createFrom().failure(new Exception(item));
, throws aRuntimeException
.