diff --git a/build.gradle b/build.gradle index 1facdb1..adea4eb 100644 --- a/build.gradle +++ b/build.gradle @@ -74,7 +74,7 @@ sourceSets { dependencies { implementation fileTree(dir: 'libs', include: ['*.jar']) testImplementation 'org.mockito:mockito-core:2.+' - testImplementation 'junit:junit:4.12' + testImplementation 'junit:junit:4.13.1' testImplementation 'org.assertj:assertj-core:2.+' } diff --git a/src/main/java/com/namehillsoftware/handoff/promises/queued/QueuedPromise.java b/src/main/java/com/namehillsoftware/handoff/promises/queued/QueuedPromise.java index 67eca0e..9d89a80 100644 --- a/src/main/java/com/namehillsoftware/handoff/promises/queued/QueuedPromise.java +++ b/src/main/java/com/namehillsoftware/handoff/promises/queued/QueuedPromise.java @@ -1,12 +1,16 @@ package com.namehillsoftware.handoff.promises.queued; import com.namehillsoftware.handoff.Messenger; +import com.namehillsoftware.handoff.cancellation.Cancellable; +import com.namehillsoftware.handoff.cancellation.CancellationResponse; import com.namehillsoftware.handoff.promises.MessengerOperator; import com.namehillsoftware.handoff.promises.Promise; +import com.namehillsoftware.handoff.promises.propagation.CancellationProxy; import com.namehillsoftware.handoff.promises.queued.cancellation.CancellableMessageWriter; import com.namehillsoftware.handoff.promises.queued.cancellation.CancellablePreparedMessengerOperator; import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicReference; public class QueuedPromise extends Promise { public QueuedPromise(MessengerOperator task, Executor executor) { @@ -24,10 +28,15 @@ public QueuedPromise(MessageWriter task, Executor executor) { private static class Execution { static final class QueuedMessengerResponse implements MessengerOperator, - Runnable { + Messenger, + Runnable, + Cancellable { private final MessengerOperator task; private final Executor executor; + private final CancellationProxy cancellationProxy = new CancellationProxy(); + private final AtomicReference cancellationResponse = new AtomicReference<>(); + private Messenger resultMessenger; QueuedMessengerResponse(MessengerOperator task, Executor executor) { @@ -37,13 +46,42 @@ static final class QueuedMessengerResponse implements @Override public void send(Messenger resultMessenger) { + resultMessenger.awaitCancellation(cancellationProxy); this.resultMessenger = resultMessenger; executor.execute(this); } + @Override + public void sendResolution(Resolution resolution) { + resultMessenger.sendResolution(resolution); + } + + @Override + public void sendRejection(Throwable error) { + resultMessenger.sendRejection(error); + } + + @Override + public void awaitCancellation(CancellationResponse cancellationResponse) { + if (this.cancellationResponse.getAndSet(cancellationResponse) == null) { + cancellationProxy.doCancel(this); + } + } + @Override public void run() { - task.send(resultMessenger); + try { + task.send(this); + } finally { + cancellationResponse.lazySet(null); + } + } + + @Override + public void cancel() { + final CancellationResponse cancellationResponse = this.cancellationResponse.get(); + if (cancellationResponse != null) + cancellationResponse.cancellationRequested(); } } } diff --git a/src/test/java/com/namehillsoftware/handoff/promises/queued/cancellation/GivenACancellableQueuedPromise/AndMultipleCancellationResponsesAreAttached/WhenThePromiseIsCancelledAndNoticed.java b/src/test/java/com/namehillsoftware/handoff/promises/queued/cancellation/GivenACancellableQueuedPromise/AndMultipleCancellationResponsesAreAttached/WhenThePromiseIsCancelledAndNoticed.java new file mode 100644 index 0000000..88bf603 --- /dev/null +++ b/src/test/java/com/namehillsoftware/handoff/promises/queued/cancellation/GivenACancellableQueuedPromise/AndMultipleCancellationResponsesAreAttached/WhenThePromiseIsCancelledAndNoticed.java @@ -0,0 +1,66 @@ +package com.namehillsoftware.handoff.promises.queued.cancellation.GivenACancellableQueuedPromise.AndMultipleCancellationResponsesAreAttached; + + +import com.namehillsoftware.handoff.promises.queued.QueuedPromise; +import com.namehillsoftware.handoff.promises.queued.cancellation.TestExecutors; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import static org.assertj.core.api.Assertions.assertThat; + +public class WhenThePromiseIsCancelledAndNoticed { + private static final Throwable thrownException = new Exception(); + private static Throwable caughtException; + private static volatile boolean isUnexpectedCancellationCalled = false; + + @BeforeClass + public static void before() throws InterruptedException { + final CountDownLatch promiseBegunLatch = new CountDownLatch(1); + final CountDownLatch promiseLatch = new CountDownLatch(1); + final QueuedPromise cancellablePromise = new QueuedPromise<>((messenger) -> { + messenger.awaitCancellation(() -> isUnexpectedCancellationCalled = true); + messenger.awaitCancellation(() -> messenger.sendRejection(thrownException)); + + promiseBegunLatch.countDown(); + + try { + promiseLatch.await(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + }, TestExecutors.TEST_EXECUTOR); + + final CountDownLatch rejectionLatch = new CountDownLatch(1); + cancellablePromise.then( + (r) -> { + rejectionLatch.countDown(); + return null; + }, + (exception) -> { + caughtException = exception; + rejectionLatch.countDown(); + return null; + }); + + promiseBegunLatch.await(10, TimeUnit.SECONDS); + + cancellablePromise.cancel(); + + promiseLatch.countDown(); + + rejectionLatch.await(10, TimeUnit.SECONDS); + } + + @Test + public void thenTheRejectionIsCorrect() { + assertThat(caughtException).isEqualTo(thrownException); + } + + @Test + public void thenTheUnexpectedCancellationIsNotCalled() { + assertThat(isUnexpectedCancellationCalled).isFalse(); + } +} diff --git a/src/test/java/com/namehillsoftware/handoff/promises/queued/cancellation/GivenACancellableQueuedPromise/AndMultipleCancellationResponsesAreAttached/WhenThePromiseIsCancelledImmediately.java b/src/test/java/com/namehillsoftware/handoff/promises/queued/cancellation/GivenACancellableQueuedPromise/AndMultipleCancellationResponsesAreAttached/WhenThePromiseIsCancelledImmediately.java new file mode 100644 index 0000000..c2bb705 --- /dev/null +++ b/src/test/java/com/namehillsoftware/handoff/promises/queued/cancellation/GivenACancellableQueuedPromise/AndMultipleCancellationResponsesAreAttached/WhenThePromiseIsCancelledImmediately.java @@ -0,0 +1,59 @@ +package com.namehillsoftware.handoff.promises.queued.cancellation.GivenACancellableQueuedPromise.AndMultipleCancellationResponsesAreAttached; + + +import com.namehillsoftware.handoff.promises.queued.QueuedPromise; +import com.namehillsoftware.handoff.promises.queued.cancellation.TestExecutors; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import static org.assertj.core.api.Assertions.assertThat; + +public class WhenThePromiseIsCancelledImmediately { + private static final Throwable thrownException = new Exception(); + private static Throwable caughtException; + private static volatile boolean isUnexpectedCancellationCalled = false; + + @BeforeClass + public static void before() throws InterruptedException { + final CountDownLatch rejectionLatch = new CountDownLatch(1); + + final QueuedPromise cancellablePromise = new QueuedPromise<>((messenger) -> { + messenger.awaitCancellation(() -> messenger.sendRejection(thrownException)); + + try { + rejectionLatch.await(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + messenger.awaitCancellation(() -> isUnexpectedCancellationCalled = true); + }, TestExecutors.TEST_EXECUTOR); + + cancellablePromise.cancel(); + + cancellablePromise.then( + (r) -> { + rejectionLatch.countDown(); + return null; + }, + (exception) -> { + caughtException = exception; + rejectionLatch.countDown(); + return null; + }); + + rejectionLatch.await(10, TimeUnit.SECONDS); + } + + @Test + public void thenTheRejectionIsCorrect() { + assertThat(caughtException).isEqualTo(thrownException); + } + + @Test + public void thenTheUnexpectedCancellationIsNotCalled() { + assertThat(isUnexpectedCancellationCalled).isFalse(); + } +} diff --git a/src/test/java/com/namehillsoftware/handoff/promises/queued/cancellation/GivenACancellableQueuedPromise/WhenThePromiseIsCancelledImmediately.java b/src/test/java/com/namehillsoftware/handoff/promises/queued/cancellation/GivenACancellableQueuedPromise/WhenThePromiseIsCancelledImmediately.java new file mode 100644 index 0000000..798f8e7 --- /dev/null +++ b/src/test/java/com/namehillsoftware/handoff/promises/queued/cancellation/GivenACancellableQueuedPromise/WhenThePromiseIsCancelledImmediately.java @@ -0,0 +1,45 @@ +package com.namehillsoftware.handoff.promises.queued.cancellation.GivenACancellableQueuedPromise; + + +import com.namehillsoftware.handoff.promises.queued.QueuedPromise; +import com.namehillsoftware.handoff.promises.queued.cancellation.TestExecutors; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import static org.assertj.core.api.Assertions.assertThat; + +public class WhenThePromiseIsCancelledImmediately { + private static final Throwable thrownException = new Exception(); + private static volatile Throwable caughtException; + + @BeforeClass + public static void before() throws InterruptedException { + final QueuedPromise cancellablePromise = new QueuedPromise<>((messenger) -> { + messenger.awaitCancellation(() -> messenger.sendRejection(thrownException)); + }, TestExecutors.TEST_EXECUTOR); + + cancellablePromise.cancel(); + + final CountDownLatch rejectionLatch = new CountDownLatch(1); + cancellablePromise.then( + (r) -> { + rejectionLatch.countDown(); + return null; + }, + (exception) -> { + caughtException = exception; + rejectionLatch.countDown(); + return null; + }); + + rejectionLatch.await(10, TimeUnit.SECONDS); + } + + @Test + public void thenTheRejectionIsCorrect() { + assertThat(caughtException).isEqualTo(thrownException); + } +}