Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
/* $Id$ */
package com.linkedin.common.callback;

import java.util.concurrent.CancellationException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
Expand All @@ -25,7 +26,7 @@
import java.util.concurrent.atomic.AtomicReference;

/**
* Simple Future that does not support cancellation.
* Simple Future that bridges a Callback to a Future and now supports optional cancellation.
*
* @author Chris Pettitt
* @version $Revision$
Expand All @@ -34,20 +35,24 @@ public class FutureCallback<T> implements Future<T>, Callback<T>
{
private final AtomicReference<Result<T>> _result = new AtomicReference<>();
private final CountDownLatch _doneLatch = new CountDownLatch(1);
private volatile boolean cancelled = false;

@Override
public boolean cancel(boolean mayInterruptIfRunning)
{
// cancellation is not supported
return false;
if (isDone()) {
return false;
} else {
cancelled = true;
_doneLatch.countDown();
return true;
}
}

@Override
public boolean isCancelled()
{

// cancellation is not supported
return false;
return cancelled;
}

@Override
Expand All @@ -60,12 +65,18 @@ public boolean isDone()
public T get() throws InterruptedException, ExecutionException
{
_doneLatch.await();
if (cancelled) {
throw new CancellationException("FutureCallback was cancelled.");
}
return unwrapResult();
}

private T getRaw() throws Throwable
{
_doneLatch.await();
if (cancelled) {
throw new CancellationException("FutureCallback was cancelled.");
}
return unwrapResultRaw();
}

Expand All @@ -78,6 +89,9 @@ public T get(final long timeout, final TimeUnit unit) throws InterruptedExceptio
{
throw new TimeoutException();
}
if (cancelled) {
throw new CancellationException("FutureCallback was cancelled.");
}
return unwrapResult();
}

Expand All @@ -87,6 +101,9 @@ private T getRaw(final long timeout, final TimeUnit unit) throws Throwable
{
throw new TimeoutException();
}
if (cancelled) {
throw new CancellationException("FutureCallback was cancelled.");
}
return unwrapResultRaw();
}

Expand Down