diff --git a/pegasus-common/src/main/java/com/linkedin/common/callback/FutureCallback.java b/pegasus-common/src/main/java/com/linkedin/common/callback/FutureCallback.java index 3faebd0c65..a1bc7a5013 100644 --- a/pegasus-common/src/main/java/com/linkedin/common/callback/FutureCallback.java +++ b/pegasus-common/src/main/java/com/linkedin/common/callback/FutureCallback.java @@ -17,6 +17,7 @@ /* $Id$ */ package com.linkedin.common.callback; +import java.util.concurrent.CancellationException; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; @@ -25,7 +26,7 @@ import java.util.concurrent.atomic.AtomicReference; /** - * Simple Future that does not support cancellation. + * Simple Future that bridges a Callback to a Future and now supports optional cancellation. * * @author Chris Pettitt * @version $Revision$ @@ -34,20 +35,24 @@ public class FutureCallback implements Future, Callback { private final AtomicReference> _result = new AtomicReference<>(); private final CountDownLatch _doneLatch = new CountDownLatch(1); + private volatile boolean cancelled = false; @Override public boolean cancel(boolean mayInterruptIfRunning) { - // cancellation is not supported - return false; + if (isDone()) { + return false; + } else { + cancelled = true; + _doneLatch.countDown(); + return true; + } } @Override public boolean isCancelled() { - - // cancellation is not supported - return false; + return cancelled; } @Override @@ -60,12 +65,18 @@ public boolean isDone() public T get() throws InterruptedException, ExecutionException { _doneLatch.await(); + if (cancelled) { + throw new CancellationException("FutureCallback was cancelled."); + } return unwrapResult(); } private T getRaw() throws Throwable { _doneLatch.await(); + if (cancelled) { + throw new CancellationException("FutureCallback was cancelled."); + } return unwrapResultRaw(); } @@ -78,6 +89,9 @@ public T get(final long timeout, final TimeUnit unit) throws InterruptedExceptio { throw new TimeoutException(); } + if (cancelled) { + throw new CancellationException("FutureCallback was cancelled."); + } return unwrapResult(); } @@ -87,6 +101,9 @@ private T getRaw(final long timeout, final TimeUnit unit) throws Throwable { throw new TimeoutException(); } + if (cancelled) { + throw new CancellationException("FutureCallback was cancelled."); + } return unwrapResultRaw(); }