Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ public class CancellationToken implements CancellationSignal, CancellationRespon

private volatile boolean isCancelled;

@Override
public final boolean isCancelled() {
return isCancelled;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package com.namehillsoftware.handoff.promises.queued;

import com.namehillsoftware.handoff.cancellation.CancellationToken;
import com.namehillsoftware.handoff.promises.Promise;
import com.namehillsoftware.handoff.promises.queued.cancellation.CancellableMessageWriter;

public class DispatchablePromise<Resolution> extends Promise<Resolution> {

private final CancellationToken cancellationToken = new CancellationToken();
private final CancellableMessageWriter<Resolution> messageWriter;

public DispatchablePromise(CancellableMessageWriter<Resolution> messageWriter) {
awaitCancellation(cancellationToken);
this.messageWriter = messageWriter;
}

public final void dispatchMessage() {
try {
resolve(messageWriter.prepareMessage(cancellationToken));
} catch (Throwable e) {
reject(e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package com.namehillsoftware.handoff.promises.queued;

import com.namehillsoftware.handoff.promises.queued.cancellation.CancellableMessageWriter;

public class ExecutablePromise<Resolution> extends DispatchablePromise<Resolution> implements Runnable {

public ExecutablePromise(CancellableMessageWriter<Resolution> messageWriter) {
super(messageWriter);
}

@Override
public final void run() {
dispatchMessage();
}
}

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,88 +1,12 @@
package com.namehillsoftware.handoff.promises.queued;

import com.namehillsoftware.handoff.Messenger;
import com.namehillsoftware.handoff.cancellation.Cancellable;
import com.namehillsoftware.handoff.cancellation.CancellationResponse;
import com.namehillsoftware.handoff.promises.MessengerOperator;
import com.namehillsoftware.handoff.promises.Promise;
import com.namehillsoftware.handoff.promises.propagation.CancellationProxy;
import com.namehillsoftware.handoff.promises.queued.cancellation.CancellableMessageWriter;
import com.namehillsoftware.handoff.promises.queued.cancellation.CancellablePreparedMessengerOperator;

import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicReference;

public class QueuedPromise<Resolution> extends Promise<Resolution> {
public QueuedPromise(MessengerOperator<Resolution> task, Executor executor) {
super(new Execution.QueuedMessengerResponse<>(task, executor));
}

public class QueuedPromise<Resolution> extends ExecutablePromise<Resolution> {
public QueuedPromise(CancellableMessageWriter<Resolution> task, Executor executor) {
this(new CancellablePreparedMessengerOperator<>(task), executor);
}

public QueuedPromise(MessageWriter<Resolution> task, Executor executor) {
this(new PreparedMessengerOperator<>(task), executor);
}

private static class Execution {
static final class QueuedMessengerResponse<Resolution> implements
MessengerOperator<Resolution>,
Messenger<Resolution>,
Runnable,
Cancellable {

private final MessengerOperator<Resolution> task;
private final Executor executor;
private final CancellationProxy cancellationProxy = new CancellationProxy();
private final AtomicReference<CancellationResponse> cancellationResponse = new AtomicReference<>();

private Messenger<Resolution> resultMessenger;

QueuedMessengerResponse(MessengerOperator<Resolution> task, Executor executor) {
this.task = task;
this.executor = executor;
}

@Override
public void send(Messenger<Resolution> resultMessenger) {
resultMessenger.awaitCancellation(cancellationProxy);
this.resultMessenger = resultMessenger;
executor.execute(this);
}

@Override
public void sendResolution(Resolution resolution) {
resultMessenger.sendResolution(resolution);
}

@Override
public void sendRejection(Throwable error) {
resultMessenger.sendRejection(error);
}

@Override
public void awaitCancellation(CancellationResponse cancellationResponse) {
if (this.cancellationResponse.getAndSet(cancellationResponse) == null) {
cancellationProxy.doCancel(this);
}
}

@Override
public void run() {
try {
task.send(this);
} finally {
cancellationResponse.lazySet(null);
}
}

@Override
public void cancel() {
final CancellationResponse cancellationResponse = this.cancellationResponse.get();
if (cancellationResponse != null)
cancellationResponse.cancellationRequested();
}
}
super(task);
executor.execute(this);
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ public static void before() throws InterruptedException {
final CountDownLatch cancellationLatch = new CountDownLatch(1);
final CountDownLatch resultLatch = new CountDownLatch(1);

final Promise<Void> queuedPromise = new QueuedPromise<>(() -> {
final Promise<Void> queuedPromise = new QueuedPromise<>(cs -> {
cancellationLatch.await();
throw new Exception("whoops");
}, TestExecutors.TEST_EXECUTOR);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public class WhenTheActionIsCancelled {
public static void before() throws InterruptedException {
final CountDownLatch testReadyLatch = new CountDownLatch(1);

final Promise<String> promisedMustAction = new QueuedPromise<>(() -> {
final Promise<String> promisedMustAction = new QueuedPromise<>(cs -> {
testReadyLatch.await();
return "test";
}, TestExecutors.TEST_EXECUTOR)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ public static void before() {
Promise.Rejections.setUnhandledRejectionsReceiver(rejection -> unhandledRejection = true);

final CountDownLatch countDownLatch = new CountDownLatch(1);
final Promise<Object> promise = new QueuedPromise<>(() -> {
final Promise<Object> promise = new QueuedPromise<>(cs -> {
if (!countDownLatch.await(10, TimeUnit.SECONDS))
throw new TimeoutException();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public static void before() throws InterruptedException, TimeoutException {
});

final CountDownLatch countDownLatch = new CountDownLatch(1);
final Promise<Object> promise = new QueuedPromise<>(() -> {
final Promise<Object> promise = new QueuedPromise<>(cs -> {
if (!countDownLatch.await(10, TimeUnit.SECONDS))
throw new TimeoutException();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ public static void before() throws InterruptedException {
// Use latches to ensure correct order of execution.
final CountDownLatch cancellationLatch = new CountDownLatch(1);
final CountDownLatch resultLatch = new CountDownLatch(1);
final Promise<String> response = new QueuedPromise<>(() -> {
final Promise<String> response = new QueuedPromise<>(cs -> {
cancellationLatch.await();
return "test";
}, TestExecutors.TEST_EXECUTOR)
Expand Down

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,11 @@ public class WhenThePromiseIsCancelledImmediately {

@BeforeClass
public static void before() throws InterruptedException {
final QueuedPromise<String> cancellablePromise = new QueuedPromise<>((messenger) -> {
messenger.awaitCancellation(() -> messenger.sendRejection(thrownException));
final QueuedPromise<String> cancellablePromise = new QueuedPromise<>(cs -> {
if (cs.isCancelled())
throw thrownException;

return "";
}, TestExecutors.TEST_EXECUTOR);

cancellablePromise.cancel();
Expand Down
Loading