Skip to content

Commit

Permalink
refactor: change exceptionallyAsync/exceptionallyCompose* methods w…
Browse files Browse the repository at this point in the history
…ith more generic type(`CompletionStage`) in `CompletableFutureUtils` 🧬
  • Loading branch information
oldratlee committed May 12, 2024
1 parent 0d1dbf8 commit 9257996
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -941,8 +941,8 @@ public static <T, U> CompletableFuture<U> applyToEitherSuccessAsync(
* @see CompletionStage#whenComplete(BiConsumer)
* @see java.util.stream.Stream#peek(Consumer)
*/
public static <T, C extends CompletionStage<? extends T>> C peek(
C cf, BiConsumer<? super T, ? super Throwable> action) {
public static <T, C extends CompletionStage<? extends T>>
C peek(C cf, BiConsumer<? super T, ? super Throwable> action) {
cf.whenComplete(action);
return cf;
}
Expand All @@ -965,8 +965,8 @@ public static <T, C extends CompletionStage<? extends T>> C peek(
* @see CompletionStage#whenCompleteAsync(BiConsumer)
* @see java.util.stream.Stream#peek(Consumer)
*/
public static <T, C extends CompletionStage<? extends T>> C peekAsync(
C cf, BiConsumer<? super T, ? super Throwable> action) {
public static <T, C extends CompletionStage<? extends T>>
C peekAsync(C cf, BiConsumer<? super T, ? super Throwable> action) {
cf.whenCompleteAsync(action);
return cf;
}
Expand All @@ -988,8 +988,8 @@ public static <T, C extends CompletionStage<? extends T>> C peekAsync(
* @see CompletionStage#whenCompleteAsync(BiConsumer, Executor)
* @see java.util.stream.Stream#peek(Consumer)
*/
public static <T, C extends CompletionStage<? extends T>> C peekAsync(
C cf, BiConsumer<? super T, ? super Throwable> action, Executor executor) {
public static <T, C extends CompletionStage<? extends T>>
C peekAsync(C cf, BiConsumer<? super T, ? super Throwable> action, Executor executor) {
cf.whenCompleteAsync(action, executor);
return cf;
}
Expand Down Expand Up @@ -1109,8 +1109,8 @@ public static Executor delayedExecutor(long delay, TimeUnit unit, Executor execu
* if given CompletionStage completed exceptionally
* @return the new CompletableFuture
*/
public static <T> CompletableFuture<T> exceptionallyAsync(
CompletableFuture<T> cf, Function<Throwable, ? extends T> fn) {
public static <T, C extends CompletionStage<? super T>>
C exceptionallyAsync(C cf, Function<Throwable, ? extends T> fn) {
return exceptionallyAsync(cf, fn, AsyncPoolHolder.ASYNC_POOL);
}

Expand All @@ -1124,15 +1124,16 @@ public static <T> CompletableFuture<T> exceptionallyAsync(
* @param executor the executor to use for asynchronous execution
* @return the new CompletableFuture
*/
public static <T> CompletableFuture<T> exceptionallyAsync(
CompletableFuture<T> cf, Function<Throwable, ? extends T> fn, Executor executor) {
@SuppressWarnings("unchecked")
public static <T, C extends CompletionStage<? super T>>
C exceptionallyAsync(C cf, Function<Throwable, ? extends T> fn, Executor executor) {
if (IS_JAVA12_PLUS) {
return cf.exceptionallyAsync(fn, executor);
return (C) cf.exceptionallyAsync(fn, executor);
}
requireNonNull(fn, "fn is null");
requireNonNull(executor, "executor is null");
// below code is copied from CompletionStage#exceptionallyAsync
return cf.handle((r, ex) -> (ex == null) ? cf :
return (C) cf.handle((r, ex) -> (ex == null) ? cf :
cf.<T>handleAsync((r1, ex1) -> fn.apply(ex1), executor)
).thenCompose(x -> x);
}
Expand Down Expand Up @@ -1194,14 +1195,15 @@ C completeOnTimeout(C cf, @Nullable T value, long timeout, TimeUnit unit) {
* if given CompletionStage completed exceptionally
* @return the new CompletableFuture
*/
public static <T> CompletableFuture<T> exceptionallyCompose(
CompletableFuture<T> cf, Function<Throwable, ? extends CompletionStage<T>> fn) {
@SuppressWarnings({"unchecked", "rawtypes"})
public static <T, C extends CompletionStage<? super T>>
C exceptionallyCompose(C cf, Function<Throwable, ? extends CompletionStage<T>> fn) {
if (IS_JAVA12_PLUS) {
return cf.exceptionallyCompose(fn);
return (C) cf.exceptionallyCompose((Function) fn);
}
requireNonNull(fn, "fn is null");
// below code is copied from CompletionStage.exceptionallyCompose
return cf.handle((r, ex) -> (ex == null) ? cf : fn.apply(ex)).thenCompose(x -> x);
return (C) cf.handle((r, ex) -> (ex == null) ? cf : fn.apply(ex)).thenCompose(x -> x);
}

/**
Expand All @@ -1213,8 +1215,8 @@ public static <T> CompletableFuture<T> exceptionallyCompose(
* if given CompletionStage completed exceptionally
* @return the new CompletableFuture
*/
public static <T> CompletableFuture<T> exceptionallyComposeAsync(
CompletableFuture<T> cf, Function<Throwable, ? extends CompletionStage<T>> fn) {
public static <T, C extends CompletionStage<? super T>>
C exceptionallyComposeAsync(C cf, Function<Throwable, ? extends CompletionStage<T>> fn) {
return exceptionallyComposeAsync(cf, fn, AsyncPoolHolder.ASYNC_POOL);
}

Expand All @@ -1227,15 +1229,16 @@ public static <T> CompletableFuture<T> exceptionallyComposeAsync(
* @param executor the executor to use for asynchronous execution
* @return the new CompletableFuture
*/
public static <T> CompletableFuture<T> exceptionallyComposeAsync(
CompletableFuture<T> cf, Function<Throwable, ? extends CompletionStage<T>> fn, Executor executor) {
@SuppressWarnings({"unchecked", "rawtypes"})
public static <T, C extends CompletionStage<? super T>>
C exceptionallyComposeAsync(C cf, Function<Throwable, ? extends CompletionStage<T>> fn, Executor executor) {
if (IS_JAVA12_PLUS) {
return cf.exceptionallyComposeAsync(fn, executor);
return (C) cf.exceptionallyComposeAsync((Function) fn, executor);
}
requireNonNull(fn, "fn is null");
requireNonNull(executor, "executor is null");
// below code is copied from CompletionStage.exceptionallyComposeAsync
return cf.handle((r, ex) -> (ex == null) ? cf :
return (C) cf.handle((r, ex) -> (ex == null) ? cf :
cf.handleAsync((r1, ex1) -> fn.apply(ex1), executor).thenCompose(x -> x)
).thenCompose(x -> x);
}
Expand Down Expand Up @@ -1405,7 +1408,8 @@ public static CffuState state(CompletableFuture<?> cf) {
* @param supplier a function returning the value to be used to complete given CompletableFuture
* @return the given CompletableFuture
*/
public static <T, C extends CompletableFuture<? super T>> C completeAsync(C cf, Supplier<? extends T> supplier) {
public static <T, C extends CompletableFuture<? super T>>
C completeAsync(C cf, Supplier<? extends T> supplier) {
return completeAsync(cf, supplier, AsyncPoolHolder.ASYNC_POOL);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.EnabledForJreRange;
import org.junit.jupiter.api.condition.JRE;

import java.util.Arrays;
import java.util.Collections;
Expand Down Expand Up @@ -1029,27 +1031,49 @@ void test_completableFutureListToArray() {
////////////////////////////////////////////////////////////////////////////////

@Test
@SuppressWarnings("UnnecessaryLocalVariable")
void checkTypeParameterDeclaration() throws Exception {
@SuppressWarnings({"UnnecessaryLocalVariable", "unused"})
void checkTypeParameterDeclaration_peek_completeAsync() throws Exception {
final CompletableFuture<Integer> f = completedFuture(42);

final CompletableFuture<? extends Integer> fe = f;
final CompletableFuture<? super Integer> fs = f;
final CompletableFuture<?> fq = f;

final BiConsumer<? super Integer, Throwable> c = (v, ex) -> {
};
CompletableFutureUtils.peek(fe, c).get();
CompletableFutureUtils.peekAsync(fe, c).get();
CompletableFutureUtils.peekAsync(fe, c, executorService).get();

final CompletableFuture<? super Integer> fs = f;
final Supplier<? extends Integer> s = () -> 0;
fs.complete(0);
CompletableFutureUtils.completeAsync(fs, s).complete(1);
CompletableFutureUtils.completeAsync(fs, s, executorService).complete(1);

CompletableFuture<?> fq = f;
orTimeout(fq, 1, TimeUnit.MILLISECONDS);
orTimeout(fs, 1, TimeUnit.MILLISECONDS);
orTimeout(fe, 1, TimeUnit.MILLISECONDS);
CompletableFuture<? extends Integer> ffe = orTimeout(fe, 1, TimeUnit.MILLISECONDS);
CompletableFuture<? super Integer> ffs = orTimeout(fs, 1, TimeUnit.MILLISECONDS);
CompletableFuture<?> ffq = orTimeout(fq, 1, TimeUnit.MILLISECONDS);
}

@Test
@EnabledForJreRange(min = JRE.JAVA_19)
@SuppressWarnings({"UnnecessaryLocalVariable", "unused", "RedundantThrows"})
void checkTypeParameterDeclaration_resultNow() throws Exception {
final CompletableFuture<Integer> f = completedFuture(42);
final CompletableFuture<? extends Integer> fe = f;
final CompletableFuture<? super Integer> fs = f;
final CompletableFuture<?> fq = f;

Integer i = f.resultNow();
Integer ii = resultNow(f);

Integer ie = fe.resultNow();
Integer iie = resultNow(fe);

Object is = fs.resultNow();
Object iis = resultNow(fs);

Object iq = fq.resultNow();
Object iiq = resultNow(fq);
}

////////////////////////////////////////////////////////////////////////////////
Expand Down

0 comments on commit 9257996

Please sign in to comment.