From c0f359dbeae676a6ec017a9390385b1140f140bc Mon Sep 17 00:00:00 2001 From: Julien Ponge Date: Wed, 10 Jul 2024 22:05:08 +0200 Subject: [PATCH 1/2] fix(tests): await failures before asserting them This fixes timing issues and possibly flaky tests. --- .../smallrye/mutiny/operators/MultiOnFailureRetryWhenTest.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/implementation/src/test/java/io/smallrye/mutiny/operators/MultiOnFailureRetryWhenTest.java b/implementation/src/test/java/io/smallrye/mutiny/operators/MultiOnFailureRetryWhenTest.java index e95c7023e..773ae4818 100644 --- a/implementation/src/test/java/io/smallrye/mutiny/operators/MultiOnFailureRetryWhenTest.java +++ b/implementation/src/test/java/io/smallrye/mutiny/operators/MultiOnFailureRetryWhenTest.java @@ -323,6 +323,7 @@ public void testRetryWithRandomBackoff() { await().until(() -> subscriber.getItems().size() >= 10); subscriber .assertItems(0, 1, 0, 1, 0, 1, 0, 1, 0, 1) // Initial subscription + 4 retries + .awaitFailure() .assertFailedWith(IOException.class, "boom retry") .awaitFailure(t -> { // expecting an IllegalStateException with an info about the retries made @@ -347,6 +348,7 @@ public void testRetryWithRandomBackoffAndDefaultJitter() { await().until(() -> subscriber.getItems().size() >= 10); subscriber .assertItems(0, 1, 0, 1, 0, 1, 0, 1, 0, 1) // Initial subscription + 4 retries + .awaitFailure() .assertFailedWith(IOException.class, "boom retry") .awaitFailure(t -> { // expecting an IllegalStateException with an info about the retries made @@ -371,6 +373,7 @@ public void testRetryWithDefaultMax() { .until(() -> subscriber.getItems().size() >= 10); subscriber .assertItems(0, 1, 0, 1, 0, 1, 0, 1, 0, 1) // Initial subscription + 4 retries + .awaitFailure() .assertFailedWith(IOException.class, "boom retry") .awaitFailure(t -> { // expecting an IllegalStateException with an info about the retries made From b8c40965882b0717f65b56f83f5e20c3579eeadd Mon Sep 17 00:00:00 2001 From: Julien Ponge Date: Wed, 10 Jul 2024 22:07:02 +0200 Subject: [PATCH 2/2] fix: ensure inner subscribers get cancelled on immediate inner failure This has been reported as a discussion in https://github.com/smallrye/smallrye-mutiny/discussions/1642 --- .../operators/multi/MultiFlatMapOp.java | 6 ++++++ .../mutiny/operators/MultiMergeTest.java | 19 +++++++++++++++++++ 2 files changed, 25 insertions(+) diff --git a/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiFlatMapOp.java b/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiFlatMapOp.java index a7f6aa69c..f3dd144be 100755 --- a/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiFlatMapOp.java +++ b/implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiFlatMapOp.java @@ -423,6 +423,12 @@ private void cancelUpstream(boolean fromOnError) { Subscription subscription = UPSTREAM_UPDATER.getAndSet(this, Subscriptions.CANCELLED); if (subscription != null) { subscription.cancel(); + FlatMapInner[] currentInners = inners.get(); + for (FlatMapInner inner : currentInners) { + if (inner != null) { + inner.cancel(false); + } + } } unsubscribe(fromOnError); } diff --git a/implementation/src/test/java/io/smallrye/mutiny/operators/MultiMergeTest.java b/implementation/src/test/java/io/smallrye/mutiny/operators/MultiMergeTest.java index d26635bd3..42a19336e 100644 --- a/implementation/src/test/java/io/smallrye/mutiny/operators/MultiMergeTest.java +++ b/implementation/src/test/java/io/smallrye/mutiny/operators/MultiMergeTest.java @@ -1,15 +1,20 @@ package io.smallrye.mutiny.operators; import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; +import java.io.IOException; +import java.time.Duration; import java.util.Arrays; import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; import org.junit.jupiter.api.Test; import io.reactivex.rxjava3.core.Flowable; import io.smallrye.mutiny.CompositeException; import io.smallrye.mutiny.Multi; +import io.smallrye.mutiny.Uni; import io.smallrye.mutiny.helpers.spies.MultiOnCancellationSpy; import io.smallrye.mutiny.helpers.spies.Spy; import io.smallrye.mutiny.helpers.test.AssertSubscriber; @@ -200,4 +205,18 @@ public void testWithFailureCollectionWithMerge() { .assertFailedWith(IllegalStateException.class, "boom"); } + + @Test + public void failureMustTriggerCancellations() { + AtomicBoolean firstCancelled = new AtomicBoolean(); + Multi first = Multi.createBy().repeating().uni( + () -> Uni.createFrom().item(123).onItem().delayIt().by(Duration.ofSeconds(5))).atMost(10l) + .onCancellation().invoke(() -> firstCancelled.set(true)); + Multi second = Multi.createFrom().failure(new IOException("boom")); + AssertSubscriber sub = Multi.createBy().merging().streams(first, second) + .subscribe().withSubscriber(AssertSubscriber.create(Long.MAX_VALUE)); + + sub.assertFailedWith(IOException.class, "boom"); + await().atMost(Duration.ofSeconds(5)).until(firstCancelled::get); + } }