diff --git a/mutiny-zero/src/main/java/mutiny/zero/internal/CompletionStagePublisher.java b/mutiny-zero/src/main/java/mutiny/zero/internal/CompletionStagePublisher.java index 01f3c65..2d9b6fc 100644 --- a/mutiny-zero/src/main/java/mutiny/zero/internal/CompletionStagePublisher.java +++ b/mutiny-zero/src/main/java/mutiny/zero/internal/CompletionStagePublisher.java @@ -4,11 +4,10 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; -import java.util.concurrent.ExecutionException; import java.util.concurrent.Flow; import java.util.concurrent.Flow.Publisher; import java.util.concurrent.Flow.Subscriber; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; public class CompletionStagePublisher implements Publisher { @@ -22,55 +21,45 @@ public CompletionStagePublisher(Supplier> completionStageSupp @Override public void subscribe(Subscriber subscriber) { requireNonNull(subscriber, "The subscriber cannot be null"); - CompletionStage cs = completionStageSupplier.get(); - if (cs == null) { - subscriber.onSubscribe(new AlreadyCompletedSubscription()); - subscriber.onError(new NullPointerException("The completion stage is null")); - return; - } - CompletableFuture completableFuture = cs.toCompletableFuture(); - if (completableFuture.isDone()) { - subscriber.onSubscribe(new AlreadyCompletedSubscription()); - try { - T value = completableFuture.get(); - if (value == null) { - subscriber.onError(new NullPointerException("The CompletionStage produced a null value")); - } else { - subscriber.onNext(value); - subscriber.onComplete(); - } - } catch (InterruptedException e) { - subscriber.onError(e); - } catch (ExecutionException e) { - subscriber.onError(e.getCause()); - } - } else { - subscriber.onSubscribe(new CompletionStageSubscription(subscriber, completableFuture)); - } + subscriber.onSubscribe(new CompletionStageSubscription<>(completionStageSupplier, subscriber)); } - private class CompletionStageSubscription implements Flow.Subscription { + private static class CompletionStageSubscription implements Flow.Subscription { + private final Supplier> completionStageSupplier; private final Subscriber subscriber; - private final CompletableFuture completableFuture; - private final AtomicBoolean cancelled = new AtomicBoolean(); + private final AtomicReference state = new AtomicReference<>(State.INIT); + private CompletableFuture completableFuture; - private CompletionStageSubscription(Subscriber subscriber, CompletableFuture completableFuture) { + enum State { + INIT, + ACTIVE, + DONE + } + + public CompletionStageSubscription(Supplier> completionStageSupplier, + Subscriber subscriber) { + this.completionStageSupplier = completionStageSupplier; this.subscriber = subscriber; - this.completableFuture = completableFuture; } @Override public void request(long n) { - if (cancelled.get()) { - return; - } if (n <= 0L) { cancel(); subscriber.onError(Helper.negativeRequest(n)); - } else { + return; + } + if (state.compareAndSet(State.INIT, State.ACTIVE)) { + CompletionStage cs = completionStageSupplier.get(); + if (cs == null) { + state.set(State.DONE); + subscriber.onError(new NullPointerException("The completion stage is null")); + return; + } + completableFuture = cs.toCompletableFuture(); completableFuture.whenComplete((value, err) -> { - if (cancelled.compareAndSet(false, true)) { + if (state.getAndSet(State.DONE) == State.ACTIVE) { if (err != null) { subscriber.onError(err); } else if (value == null) { @@ -86,8 +75,11 @@ public void request(long n) { @Override public void cancel() { - completableFuture.cancel(false); - cancelled.set(true); + if (state.getAndSet(State.DONE) != State.DONE) { + if (completableFuture != null) { + completableFuture.cancel(false); + } + } } } } diff --git a/mutiny-zero/src/test/java/mutiny/zero/ZeroPublisherTest.java b/mutiny-zero/src/test/java/mutiny/zero/ZeroPublisherTest.java index fadc114..f058b61 100644 --- a/mutiny-zero/src/test/java/mutiny/zero/ZeroPublisherTest.java +++ b/mutiny-zero/src/test/java/mutiny/zero/ZeroPublisherTest.java @@ -189,6 +189,38 @@ void fromDeferredNullValue() { sub.assertFailedWith(NullPointerException.class, "null value"); } + @Test + @DisplayName("Null CompletionStage") + void nullCompletionStage() { + AssertSubscriber sub = AssertSubscriber.create(10); + ZeroPublisher.fromCompletionStage(() -> null).subscribe(sub); + + sub.awaitFailure(Duration.ofSeconds(5)); + sub.assertFailedWith(NullPointerException.class, "The completion stage is null"); + } + + @Test + @DisplayName("Upfront CompletionStage cancellation") + void upfrontCancellation() { + AssertSubscriber sub = AssertSubscriber.create(); + ZeroPublisher.fromCompletionStage(() -> CompletableFuture.completedFuture(58)).subscribe(sub); + + sub.assertHasNotReceivedAnyItem().assertNotTerminated(); + sub.cancel(); + sub.request(10L); + sub.assertHasNotReceivedAnyItem().assertNotTerminated(); + } + + @Test + @DisplayName("Reject negative requests") + void rejectNegativeRequest() { + AssertSubscriber sub = AssertSubscriber.create(); + ZeroPublisher.fromCompletionStage(() -> CompletableFuture.completedFuture(58)).subscribe(sub); + + sub.request(0L); + sub.assertFailedWith(IllegalArgumentException.class, "(non-positive subscription request)"); + } + @Test @DisplayName("Publisher to CompletionStage (value)") void publisherToCompletionStageOk() { diff --git a/mutiny-zero/src/test/java/mutiny/zero/tck/CompletionStageTckPublisherTest.java b/mutiny-zero/src/test/java/mutiny/zero/tck/CompletionStageTckPublisherTest.java index 72d478e..ffb1e34 100644 --- a/mutiny-zero/src/test/java/mutiny/zero/tck/CompletionStageTckPublisherTest.java +++ b/mutiny-zero/src/test/java/mutiny/zero/tck/CompletionStageTckPublisherTest.java @@ -1,6 +1,5 @@ package mutiny.zero.tck; -import java.io.IOException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Flow.Publisher; @@ -22,11 +21,7 @@ public Publisher createFlowPublisher(long elements) { @Override public Publisher createFailedFlowPublisher() { - return ZeroPublisher.fromCompletionStage(() -> { - CompletableFuture future = new CompletableFuture<>(); - future.completeExceptionally(new IOException("boom")); - return future; - }); + return null; } @Override