Skip to content

Commit

Permalink
Motivation:
Browse files Browse the repository at this point in the history
io.vertx.core.Future does not use variance in its declarations preventing reuse or forcing to introduce adapter functions, e.g.

public void flatMap(Future<String> fut, Function<CharSequence, Future<Integer>> fn) {
  fut.flatMap(fn::apply); // produce a new function adapting "fn", it could be "fn" instead
}

public void onComplete(Future<String> fut, Promise<CharSequence> promise) {
  // That
  fut.onComplete(event -> promise.handle(event.map(s -> s))); // we would like to use "promise" instead

  // Or
  fut.map(s -> (CharSequence)s).onComplete(promise);
}

Changes:

Three changes are necessary to solve this problem at the expense of small breaking changes

1. Use variance on existing methods

// After
<U> flatMap(Function<T, Future<U>> fn);
// After
<U> flatMap(Function<? super T, Future<U>> fn);

This changes remains source compatible (for user), there are breaking changes but those are for implementation of the `Future` type, which are acceptable.

It does not apply to Handler<AsyncResult<T>> arguments. In practice Handler<AsyncResult<T>> is solvable but not realistic, it should be Handler<? extends AsyncResult<? super T>> but this one exhibit issues with lambdas, e.g. future.onComplete(ar -> ar.result() /* Object and not T */).

2. Overloading handler of async results methods

Use a functionnal interface accepting two arguments so a lambda will get the value and the error instead of the combined async result, pretty much like CompletionStage#whenComplete(BiConsumer<? super T, ? super Throwable>).

Future<T> onComplete(Handler<AsyncResult<T>> handler);
Future<T> onComplete(Completable<? super T> handler); // Overload

Likewise

<U> Future<U> transform(Function<AsyncResult<T>, Future<U>> fn);
<U> Future<U> transform(BiFunction<? super T, ? super Throwable, Future<U>> fn); // Overload

with

@FunctionalInterface
public interface Completable<T> {
  default void succeed(T value) {
    complete(value, null);
  }
  default void fail(Throwable cause) {
    complete(null, cause);
  }
  void complete(T value, Throwable err);
}

There are a few breaking changes though, when null is an argument, the compiler cannot decide with overload to use. We consider those are marginal, we found some of these in the vertx test suite to check that null arguments produces errors, e.g. `future.onComplete(null)`.

3. Retrofit Promise as Completable instead of async result handler

Since now we have variant part with a Completable, we need Promise to extend Completable instead of Handler<AsyncResult<T>>

Promise<T> extends Completable<T> {
  ...
}

This creates breaking changes when Promise<T> was used at the place of Handler<AsyncResult<T>>, e.g. internally in vertx we still have a few of those, and promise::handle should be used to fix the issues. Since Vert.x 5 does not anymore exhibit Handler<AsyncResult<T>> methods, this should not be an issue for users.

This will require adaptation in Vert.x code base because there are many remaining usage of `Handler<AsyncResult<T>>` idiom: here is a recap of the necessary changes https://gist.github.com/vietj/02ce9dc89c8a1c11dabe8828f760f973 . This list is actually quite long, however it mostly solves internal issues of the vertx codebase still using Vert.x 3 idioms and was never migrated to use futures, e.g. recent vertx components like the new _vertx-grpc_ or _vertx-service-resolver_ do not need any changes.
  • Loading branch information
vietj committed Sep 12, 2024
1 parent 9801071 commit 79edb86
Show file tree
Hide file tree
Showing 38 changed files with 497 additions and 608 deletions.
8 changes: 0 additions & 8 deletions vertx-core/src/main/asciidoc/futures.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,6 @@ They allow you to defer the action of providing a result.
In most cases, you don't need to create promises yourself in a Vert.x application.
<<_future_composition>> and <<_future_coordination>> provide you with the tools to transform and merge asynchronous results.
However, if, in your codebase, you have legacy methods which use callbacks, you can leverage the fact that a promise extends {@link io.vertx.core.Handler io.vertx.core.Handler<io.vertx.core.AsyncResult>}:
[source,$lang]
----
{@link examples.CoreExamples#promiseAsHandler}
----
====

[CAUTION]
====
Terminal operations like `onSuccess`, `onFailure` and `onComplete` provide no guarantee whatsoever regarding the invocation order of callbacks.
Expand Down
4 changes: 0 additions & 4 deletions vertx-core/src/main/java/examples/CoreExamples.java
Original file line number Diff line number Diff line change
Expand Up @@ -140,10 +140,6 @@ public void exampleFuture1(Vertx vertx, Handler<HttpServerRequest> requestHandle
});
}

public void promiseAsHandler() {
Future<String> greeting = Future.future(promise -> legacyGreetAsync(promise));
}

public void promiseCallbackOrder(Future<Void> future) {
future.onComplete(ar -> {
// Do something
Expand Down
2 changes: 1 addition & 1 deletion vertx-core/src/main/java/io/vertx/core/AsyncResult.java
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public interface AsyncResult<T> {
* @param mapper the mapper function
* @return the mapped async result
*/
default <U> AsyncResult<U> map(Function<T, U> mapper) {
default <U> AsyncResult<U> map(Function<? super T, U> mapper) {
if (mapper == null) {
throw new NullPointerException();
}
Expand Down
77 changes: 77 additions & 0 deletions vertx-core/src/main/java/io/vertx/core/Completable.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Copyright (c) 2011-2024 Contributors to the Eclipse Foundation
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
* which is available at https://www.apache.org/licenses/LICENSE-2.0.
*
* SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
*/
package io.vertx.core;

import io.vertx.core.impl.NoStackTraceThrowable;

/**
* A view of something that can be completed with a success or failure.
*
* @author <a href="mailto:julien@julienviet.com">Julien Viet</a>
*/
@FunctionalInterface
public interface Completable<T> {

/**
* Set the result. The instance will be marked as succeeded and completed.
* <p/>
*
* @param result the result
* @throws IllegalStateException when this instance is already completed or failed
*/
default void succeed(T result) {
complete(result, null);
}

/**
* Shortcut for {@code succeed(null)}
*
* @throws IllegalStateException when this instance is already completed or failed
*/
default void succeed() {
complete(null, null);
}

/**
* Set the failure. This instance will be marked as failed and completed.
*
* @param failure the failure
* @throws IllegalStateException when this instance is already completed or failed
*/
default void fail(Throwable failure) {
complete(null, failure);
}

/**
* Calls {@link #fail(Throwable)} with the {@code message}.
*
* @param message the failure message
* @throws IllegalStateException when this instance is already completed or failed
*/
default void fail(String message) {
complete(null, new NoStackTraceThrowable(message));
}

/**
* Complete this instance
*
* <ul>
* <li>when {@code failure} is {@code null}, a success is signaled</li>
* <li>otherwise a failure is signaled</li>
* </ul>
*
* @param result the result
* @param failure the failure
* @throws IllegalStateException when this instance is already completed
*/
void complete(T result, Throwable failure);

}
4 changes: 2 additions & 2 deletions vertx-core/src/main/java/io/vertx/core/CompositeFuture.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,13 @@ public interface CompositeFuture extends Future<CompositeFuture> {
CompositeFuture onComplete(Handler<AsyncResult<CompositeFuture>> handler);

@Override
default CompositeFuture onSuccess(Handler<CompositeFuture> handler) {
default CompositeFuture onSuccess(Handler<? super CompositeFuture> handler) {
Future.super.onSuccess(handler);
return this;
}

@Override
default CompositeFuture onFailure(Handler<Throwable> handler) {
default CompositeFuture onFailure(Handler<? super Throwable> handler) {
Future.super.onFailure(handler);
return this;
}
Expand Down
65 changes: 54 additions & 11 deletions vertx-core/src/main/java/io/vertx/core/Future.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@

import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.*;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Supplier;

Expand Down Expand Up @@ -280,7 +280,7 @@ static <T> Future<T> failedFuture(String failureMessage) {
* @param failureHandler the handler that will be called with the failed result
* @return a reference to this, so it can be used fluently
*/
default Future<T> onComplete(Handler<T> successHandler, Handler<Throwable> failureHandler) {
default Future<T> onComplete(Handler<? super T> successHandler, Handler<? super Throwable> failureHandler) {
return onComplete(ar -> {
if (successHandler != null && ar.succeeded()) {
successHandler.handle(ar.result());
Expand All @@ -290,6 +290,21 @@ default Future<T> onComplete(Handler<T> successHandler, Handler<Throwable> failu
});
}

/**
* Add handlers to be notified on succeeded result and failed result.
* <p>
* <em><strong>WARNING</strong></em>: this is a terminal operation.
* If several {@code handler}s are registered, there is no guarantee that they will be invoked in order of registration.
*
* @param handler the handler that will be called with the completion outcome
* @return a reference to this, so it can be used fluently
*/
default Future<T> onComplete(Completable<? super T> handler) {
return onComplete(ar -> {
handler.complete(ar.succeeded() ? ar.result() : null, ar.failed() ? ar.cause() : null);
});
}

/**
* Add a handler to be notified of the succeeded result.
* <p>
Expand All @@ -300,7 +315,7 @@ default Future<T> onComplete(Handler<T> successHandler, Handler<Throwable> failu
* @return a reference to this, so it can be used fluently
*/
@Fluent
default Future<T> onSuccess(Handler<T> handler) {
default Future<T> onSuccess(Handler<? super T> handler) {
return onComplete(handler, null);
}

Expand All @@ -314,7 +329,7 @@ default Future<T> onSuccess(Handler<T> handler) {
* @return a reference to this, so it can be used fluently
*/
@Fluent
default Future<T> onFailure(Handler<Throwable> handler) {
default Future<T> onFailure(Handler<? super Throwable> handler) {
return onComplete(null, handler);
}

Expand Down Expand Up @@ -353,7 +368,7 @@ default Future<T> onFailure(Handler<Throwable> handler) {
/**
* Alias for {@link #compose(Function)}.
*/
default <U> Future<U> flatMap(Function<T, Future<U>> mapper) {
default <U> Future<U> flatMap(Function<? super T, Future<U>> mapper) {
return compose(mapper);
}

Expand All @@ -372,7 +387,7 @@ default <U> Future<U> flatMap(Function<T, Future<U>> mapper) {
* @param mapper the mapper function
* @return the composed future
*/
default <U> Future<U> compose(Function<T, Future<U>> mapper) {
default <U> Future<U> compose(Function<? super T, Future<U>> mapper) {
return compose(mapper, Future::failedFuture);
}

Expand Down Expand Up @@ -404,22 +419,38 @@ default Future<T> recover(Function<Throwable, Future<T>> mapper) {
* @param failureMapper the function mapping the failure
* @return the composed future
*/
<U> Future<U> compose(Function<T, Future<U>> successMapper, Function<Throwable, Future<U>> failureMapper);
<U> Future<U> compose(Function<? super T, Future<U>> successMapper, Function<Throwable, Future<U>> failureMapper);

/**
* Transform this future with a {@code mapper} functions.<p>
* Transform this future with a {@code mapper} function.<p>
*
* When this future (the one on which {@code transform} is called) completes, the {@code mapper} will be called with
* the async result and this mapper returns another future object. This returned future completion will complete
* the async result returning another future instance. This returned future completion will complete
* the future returned by this method call.<p>
*
* If any mapper function throws an exception, the returned future will be failed with this exception.<p>
* When {@code mapper} throws an exception, the returned future will be failed with this exception.<p>
*
* @param mapper the function mapping the future
* @return the transformed future
*/
<U> Future<U> transform(Function<AsyncResult<T>, Future<U>> mapper);

/**
* Transform this future with a {@code mapper} function.<p>
*
* When this future (the one on which {@code transform} is called) completes, the {@code mapper} will be called with
* the result/failure returning another future instance. This returned future completion will complete
* the future returned by this method call.<p>
*
* When {@code mapper} throws an exception, the returned future will be failed with this exception.<p>
*
* @param mapper the function mapping the future
* @return the transformed future
*/
default <U> Future<U> transform(BiFunction<? super T, ? super Throwable, Future<U>> mapper) {
return transform(ar -> mapper.apply(ar.succeeded() ? ar.result() : null, ar.failed() ? ar.cause() : null));
}

/**
* Compose this future with a {@code mapper} that will be always be called.
*
Expand Down Expand Up @@ -449,7 +480,7 @@ default Future<T> recover(Function<Throwable, Future<T>> mapper) {
* @param mapper the mapper function
* @return the mapped future
*/
<U> Future<U> map(Function<T, U> mapper);
<U> Future<U> map(Function<? super T, U> mapper);

/**
* Map the result of a future to a specific {@code value}.<p>
Expand Down Expand Up @@ -537,6 +568,18 @@ default Future<T> andThen(Handler<AsyncResult<T>> handler) {
});
}

/**
* Invokes the given {@code handler} upon completion.
* <p>
* If the {@code handler} throws an exception, the returned future will be failed with this exception.
*
* @param handler invoked upon completion of this future
* @return a future completed after the {@code handler} has been invoked
*/
default Future<T> andThen(Completable<? super T> handler) {
return andThen(ar -> handler.complete(ar.succeeded() ? ar.result() : null, ar.failed() ? ar.cause() : null));
}

/**
* Guard the control flow of this future with an expectation.
* <p/>
Expand Down
46 changes: 27 additions & 19 deletions vertx-core/src/main/java/io/vertx/core/Promise.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
import io.vertx.core.impl.NoStackTraceThrowable;
import io.vertx.core.impl.future.PromiseImpl;

import java.util.function.BiConsumer;

/**
* Represents the writable side of an action that may, or may not, have occurred yet.
* <p>
Expand All @@ -27,7 +29,7 @@
* @author <a href="mailto:julien@julienviet.com">Julien Viet</a>
*/
@VertxGen
public interface Promise<T> extends Handler<AsyncResult<T>> {
public interface Promise<T> extends Completable<T> {

/**
* Create a promise that hasn't completed yet
Expand All @@ -45,7 +47,6 @@ static <T> Promise<T> promise() {
* @param asyncResult the async result to handle
*/
@GenIgnore(GenIgnore.PERMITTED_TYPE)
@Override
default void handle(AsyncResult<T> asyncResult) {
if (asyncResult.succeeded()) {
complete(asyncResult.result());
Expand All @@ -54,6 +55,15 @@ default void handle(AsyncResult<T> asyncResult) {
}
}

@Override
default void complete(T result, Throwable failure) {
if (failure != null) {
handle(Future.failedFuture(failure));
} else {
handle(Future.succeededFuture(result));
}
}

/**
* Set the result. Any handler will be called, if there is one, and the promise will be marked as completed.
* <p/>
Expand All @@ -75,31 +85,29 @@ default void complete(T result) {
*/
default void complete() {
if (!tryComplete()) {
throw new IllegalStateException("Result is already complete");
throw new IllegalStateException("Promise already completed");
}
}

/**
* Set the failure. Any handler will be called, if there is one, and the future will be marked as completed.
*
* @param cause the failure cause
* @throws IllegalStateException when the promise is already completed
*/
default void fail(Throwable cause) {
if (!tryFail(cause)) {
throw new IllegalStateException("Result is already complete");
default void succeed(T result) {
if (!tryComplete(null)) {
throw new IllegalStateException("Promise already completed");
}
}

default void succeed() {
complete(null, null);
}

default void fail(Throwable failure) {
if (!tryFail(failure)) {
throw new IllegalStateException("Promise already completed");
}
}

/**
* Calls {@link #fail(Throwable)} with the {@code message}.
*
* @param message the failure message
* @throws IllegalStateException when the promise is already completed
*/
default void fail(String message) {
if (!tryFail(message)) {
throw new IllegalStateException("Result is already complete");
throw new IllegalStateException("Promise already completed");
}
}

Expand Down
Loading

0 comments on commit 79edb86

Please sign in to comment.