Skip to content

Commit

Permalink
CompletionStage publisher rewrite
Browse files Browse the repository at this point in the history
  • Loading branch information
jponge committed Mar 14, 2024
1 parent 2183aea commit b1b3a02
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,10 @@

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Flow;
import java.util.concurrent.Flow.Publisher;
import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;

public class CompletionStagePublisher<T> implements Publisher<T> {
Expand All @@ -22,55 +21,45 @@ public CompletionStagePublisher(Supplier<CompletionStage<T>> completionStageSupp
@Override
public void subscribe(Subscriber<? super T> subscriber) {
requireNonNull(subscriber, "The subscriber cannot be null");
CompletionStage<T> cs = completionStageSupplier.get();
if (cs == null) {
subscriber.onSubscribe(new AlreadyCompletedSubscription());
subscriber.onError(new NullPointerException("The completion stage is null"));
return;
}
CompletableFuture<T> completableFuture = cs.toCompletableFuture();
if (completableFuture.isDone()) {
subscriber.onSubscribe(new AlreadyCompletedSubscription());
try {
T value = completableFuture.get();
if (value == null) {
subscriber.onError(new NullPointerException("The CompletionStage produced a null value"));
} else {
subscriber.onNext(value);
subscriber.onComplete();
}
} catch (InterruptedException e) {
subscriber.onError(e);
} catch (ExecutionException e) {
subscriber.onError(e.getCause());
}
} else {
subscriber.onSubscribe(new CompletionStageSubscription(subscriber, completableFuture));
}
subscriber.onSubscribe(new CompletionStageSubscription<>(completionStageSupplier, subscriber));
}

private class CompletionStageSubscription implements Flow.Subscription {
private static class CompletionStageSubscription<T> implements Flow.Subscription {

private final Supplier<CompletionStage<T>> completionStageSupplier;
private final Subscriber<? super T> subscriber;
private final CompletableFuture<T> completableFuture;
private final AtomicBoolean cancelled = new AtomicBoolean();
private final AtomicReference<State> state = new AtomicReference<>(State.INIT);
private CompletableFuture<T> completableFuture;

private CompletionStageSubscription(Subscriber<? super T> subscriber, CompletableFuture<T> completableFuture) {
enum State {
INIT,
ACTIVE,
DONE
}

public CompletionStageSubscription(Supplier<CompletionStage<T>> completionStageSupplier,
Subscriber<? super T> subscriber) {
this.completionStageSupplier = completionStageSupplier;
this.subscriber = subscriber;
this.completableFuture = completableFuture;
}

@Override
public void request(long n) {
if (cancelled.get()) {
return;
}
if (n <= 0L) {
cancel();
subscriber.onError(Helper.negativeRequest(n));
} else {
return;
}
if (state.compareAndSet(State.INIT, State.ACTIVE)) {
CompletionStage<T> cs = completionStageSupplier.get();
if (cs == null) {
state.set(State.DONE);
subscriber.onError(new NullPointerException("The completion stage is null"));
return;
}
completableFuture = cs.toCompletableFuture();
completableFuture.whenComplete((value, err) -> {
if (cancelled.compareAndSet(false, true)) {
if (state.getAndSet(State.DONE) == State.ACTIVE) {
if (err != null) {
subscriber.onError(err);
} else if (value == null) {
Expand All @@ -86,8 +75,11 @@ public void request(long n) {

@Override
public void cancel() {
completableFuture.cancel(false);
cancelled.set(true);
if (state.getAndSet(State.DONE) != State.DONE) {
if (completableFuture != null) {
completableFuture.cancel(false);
}
}
}
}
}
32 changes: 32 additions & 0 deletions mutiny-zero/src/test/java/mutiny/zero/ZeroPublisherTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,38 @@ void fromDeferredNullValue() {
sub.assertFailedWith(NullPointerException.class, "null value");
}

@Test
@DisplayName("Null CompletionStage")
void nullCompletionStage() {
AssertSubscriber<Object> sub = AssertSubscriber.create(10);
ZeroPublisher.fromCompletionStage(() -> null).subscribe(sub);

sub.awaitFailure(Duration.ofSeconds(5));
sub.assertFailedWith(NullPointerException.class, "The completion stage is null");
}

@Test
@DisplayName("Upfront CompletionStage cancellation")
void upfrontCancellation() {
AssertSubscriber<Object> sub = AssertSubscriber.create();
ZeroPublisher.fromCompletionStage(() -> CompletableFuture.completedFuture(58)).subscribe(sub);

sub.assertHasNotReceivedAnyItem().assertNotTerminated();
sub.cancel();
sub.request(10L);
sub.assertHasNotReceivedAnyItem().assertNotTerminated();
}

@Test
@DisplayName("Reject negative requests")
void rejectNegativeRequest() {
AssertSubscriber<Object> sub = AssertSubscriber.create();
ZeroPublisher.fromCompletionStage(() -> CompletableFuture.completedFuture(58)).subscribe(sub);

sub.request(0L);
sub.assertFailedWith(IllegalArgumentException.class, "(non-positive subscription request)");
}

@Test
@DisplayName("Publisher to CompletionStage (value)")
void publisherToCompletionStageOk() {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package mutiny.zero.tck;

import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Flow.Publisher;

Expand All @@ -22,11 +21,7 @@ public Publisher<Long> createFlowPublisher(long elements) {

@Override
public Publisher<Long> createFailedFlowPublisher() {
return ZeroPublisher.fromCompletionStage(() -> {
CompletableFuture<Long> future = new CompletableFuture<>();
future.completeExceptionally(new IOException("boom"));
return future;
});
return null;
}

@Override
Expand Down

0 comments on commit b1b3a02

Please sign in to comment.