Skip to content

Commit

Permalink
Merge pull request #1646 from smallrye/fix/flatmap-inner-cancellation
Browse files Browse the repository at this point in the history
fix: inner subscribers need to be cancelled (flatMap, merge, etc)
  • Loading branch information
jponge committed Jul 11, 2024
2 parents 1f727c5 + b8c4096 commit c0315b2
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,12 @@ private void cancelUpstream(boolean fromOnError) {
Subscription subscription = UPSTREAM_UPDATER.getAndSet(this, Subscriptions.CANCELLED);
if (subscription != null) {
subscription.cancel();
FlatMapInner<O>[] currentInners = inners.get();
for (FlatMapInner<O> inner : currentInners) {
if (inner != null) {
inner.cancel(false);
}
}
}
unsubscribe(fromOnError);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,20 @@
package io.smallrye.mutiny.operators;

import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;

import java.io.IOException;
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;

import org.junit.jupiter.api.Test;

import io.reactivex.rxjava3.core.Flowable;
import io.smallrye.mutiny.CompositeException;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.helpers.spies.MultiOnCancellationSpy;
import io.smallrye.mutiny.helpers.spies.Spy;
import io.smallrye.mutiny.helpers.test.AssertSubscriber;
Expand Down Expand Up @@ -200,4 +205,18 @@ public void testWithFailureCollectionWithMerge() {
.assertFailedWith(IllegalStateException.class, "boom");

}

@Test
public void failureMustTriggerCancellations() {
AtomicBoolean firstCancelled = new AtomicBoolean();
Multi<Integer> first = Multi.createBy().repeating().uni(
() -> Uni.createFrom().item(123).onItem().delayIt().by(Duration.ofSeconds(5))).atMost(10l)
.onCancellation().invoke(() -> firstCancelled.set(true));
Multi<Integer> second = Multi.createFrom().failure(new IOException("boom"));
AssertSubscriber<Integer> sub = Multi.createBy().merging().streams(first, second)
.subscribe().withSubscriber(AssertSubscriber.create(Long.MAX_VALUE));

sub.assertFailedWith(IOException.class, "boom");
await().atMost(Duration.ofSeconds(5)).until(firstCancelled::get);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,7 @@ public void testRetryWithRandomBackoff() {
await().until(() -> subscriber.getItems().size() >= 10);
subscriber
.assertItems(0, 1, 0, 1, 0, 1, 0, 1, 0, 1) // Initial subscription + 4 retries
.awaitFailure()
.assertFailedWith(IOException.class, "boom retry")
.awaitFailure(t -> {
// expecting an IllegalStateException with an info about the retries made
Expand All @@ -347,6 +348,7 @@ public void testRetryWithRandomBackoffAndDefaultJitter() {
await().until(() -> subscriber.getItems().size() >= 10);
subscriber
.assertItems(0, 1, 0, 1, 0, 1, 0, 1, 0, 1) // Initial subscription + 4 retries
.awaitFailure()
.assertFailedWith(IOException.class, "boom retry")
.awaitFailure(t -> {
// expecting an IllegalStateException with an info about the retries made
Expand All @@ -371,6 +373,7 @@ public void testRetryWithDefaultMax() {
.until(() -> subscriber.getItems().size() >= 10);
subscriber
.assertItems(0, 1, 0, 1, 0, 1, 0, 1, 0, 1) // Initial subscription + 4 retries
.awaitFailure()
.assertFailedWith(IOException.class, "boom retry")
.awaitFailure(t -> {
// expecting an IllegalStateException with an info about the retries made
Expand Down

0 comments on commit c0315b2

Please sign in to comment.