From 913e2b53f8c103e11eb05bc26ff3aa98836d42c4 Mon Sep 17 00:00:00 2001 From: David Date: Sun, 29 Dec 2024 14:00:01 -0600 Subject: [PATCH 1/2] Added promises to dispatch messages --- .../cancellation/CancellationToken.java | 1 + .../promises/queued/DispatchablePromise.java | 24 ++++++ .../promises/queued/ExecutablePromise.java | 15 ++++ .../promises/queued/QueuedPromise.java | 82 +------------------ .../WhenCancellingTheResponse.java | 2 +- .../WhenTheActionIsCancelled.java | 2 +- .../WhenTheCancellationIsCalled.java | 2 +- .../WhenTheCancellationIsCalled.java | 2 +- .../WhenCancellingTheResponse.java | 2 +- .../WhenThePromiseIsCancelledAndNoticed.java | 66 --------------- .../WhenThePromiseIsCancelledImmediately.java | 59 ------------- .../WhenThePromiseIsCancelledImmediately.java | 7 +- 12 files changed, 53 insertions(+), 211 deletions(-) create mode 100644 src/main/java/com/namehillsoftware/handoff/promises/queued/DispatchablePromise.java create mode 100644 src/main/java/com/namehillsoftware/handoff/promises/queued/ExecutablePromise.java delete mode 100644 src/test/java/com/namehillsoftware/handoff/promises/queued/cancellation/GivenACancellableQueuedPromise/AndMultipleCancellationResponsesAreAttached/WhenThePromiseIsCancelledAndNoticed.java delete mode 100644 src/test/java/com/namehillsoftware/handoff/promises/queued/cancellation/GivenACancellableQueuedPromise/AndMultipleCancellationResponsesAreAttached/WhenThePromiseIsCancelledImmediately.java diff --git a/src/main/java/com/namehillsoftware/handoff/cancellation/CancellationToken.java b/src/main/java/com/namehillsoftware/handoff/cancellation/CancellationToken.java index 8fe3af4..1379c5f 100644 --- a/src/main/java/com/namehillsoftware/handoff/cancellation/CancellationToken.java +++ b/src/main/java/com/namehillsoftware/handoff/cancellation/CancellationToken.java @@ -5,6 +5,7 @@ public class CancellationToken implements CancellationSignal, CancellationRespon private volatile boolean isCancelled; + @Override public final boolean isCancelled() { return isCancelled; } diff --git a/src/main/java/com/namehillsoftware/handoff/promises/queued/DispatchablePromise.java b/src/main/java/com/namehillsoftware/handoff/promises/queued/DispatchablePromise.java new file mode 100644 index 0000000..cfa10e0 --- /dev/null +++ b/src/main/java/com/namehillsoftware/handoff/promises/queued/DispatchablePromise.java @@ -0,0 +1,24 @@ +package com.namehillsoftware.handoff.promises.queued; + +import com.namehillsoftware.handoff.cancellation.CancellationToken; +import com.namehillsoftware.handoff.promises.Promise; +import com.namehillsoftware.handoff.promises.queued.cancellation.CancellableMessageWriter; + +public class DispatchablePromise extends Promise { + + private final CancellationToken cancellationToken = new CancellationToken(); + private final CancellableMessageWriter messageWriter; + + public DispatchablePromise(CancellableMessageWriter messageWriter) { + awaitCancellation(cancellationToken); + this.messageWriter = messageWriter; + } + + public final void dispatchMessage() { + try { + resolve(messageWriter.prepareMessage(cancellationToken)); + } catch (Throwable e) { + reject(e); + } + } +} diff --git a/src/main/java/com/namehillsoftware/handoff/promises/queued/ExecutablePromise.java b/src/main/java/com/namehillsoftware/handoff/promises/queued/ExecutablePromise.java new file mode 100644 index 0000000..958562b --- /dev/null +++ b/src/main/java/com/namehillsoftware/handoff/promises/queued/ExecutablePromise.java @@ -0,0 +1,15 @@ +package com.namehillsoftware.handoff.promises.queued; + +import com.namehillsoftware.handoff.promises.queued.cancellation.CancellableMessageWriter; + +public class ExecutablePromise extends DispatchablePromise implements Runnable { + + public ExecutablePromise(CancellableMessageWriter messageWriter) { + super(messageWriter); + } + + @Override + public final void run() { + dispatchMessage(); + } +} diff --git a/src/main/java/com/namehillsoftware/handoff/promises/queued/QueuedPromise.java b/src/main/java/com/namehillsoftware/handoff/promises/queued/QueuedPromise.java index 9d89a80..8ed790d 100644 --- a/src/main/java/com/namehillsoftware/handoff/promises/queued/QueuedPromise.java +++ b/src/main/java/com/namehillsoftware/handoff/promises/queued/QueuedPromise.java @@ -1,88 +1,12 @@ package com.namehillsoftware.handoff.promises.queued; -import com.namehillsoftware.handoff.Messenger; -import com.namehillsoftware.handoff.cancellation.Cancellable; -import com.namehillsoftware.handoff.cancellation.CancellationResponse; -import com.namehillsoftware.handoff.promises.MessengerOperator; -import com.namehillsoftware.handoff.promises.Promise; -import com.namehillsoftware.handoff.promises.propagation.CancellationProxy; import com.namehillsoftware.handoff.promises.queued.cancellation.CancellableMessageWriter; -import com.namehillsoftware.handoff.promises.queued.cancellation.CancellablePreparedMessengerOperator; import java.util.concurrent.Executor; -import java.util.concurrent.atomic.AtomicReference; - -public class QueuedPromise extends Promise { - public QueuedPromise(MessengerOperator task, Executor executor) { - super(new Execution.QueuedMessengerResponse<>(task, executor)); - } +public class QueuedPromise extends ExecutablePromise { public QueuedPromise(CancellableMessageWriter task, Executor executor) { - this(new CancellablePreparedMessengerOperator<>(task), executor); - } - - public QueuedPromise(MessageWriter task, Executor executor) { - this(new PreparedMessengerOperator<>(task), executor); - } - - private static class Execution { - static final class QueuedMessengerResponse implements - MessengerOperator, - Messenger, - Runnable, - Cancellable { - - private final MessengerOperator task; - private final Executor executor; - private final CancellationProxy cancellationProxy = new CancellationProxy(); - private final AtomicReference cancellationResponse = new AtomicReference<>(); - - private Messenger resultMessenger; - - QueuedMessengerResponse(MessengerOperator task, Executor executor) { - this.task = task; - this.executor = executor; - } - - @Override - public void send(Messenger resultMessenger) { - resultMessenger.awaitCancellation(cancellationProxy); - this.resultMessenger = resultMessenger; - executor.execute(this); - } - - @Override - public void sendResolution(Resolution resolution) { - resultMessenger.sendResolution(resolution); - } - - @Override - public void sendRejection(Throwable error) { - resultMessenger.sendRejection(error); - } - - @Override - public void awaitCancellation(CancellationResponse cancellationResponse) { - if (this.cancellationResponse.getAndSet(cancellationResponse) == null) { - cancellationProxy.doCancel(this); - } - } - - @Override - public void run() { - try { - task.send(this); - } finally { - cancellationResponse.lazySet(null); - } - } - - @Override - public void cancel() { - final CancellationResponse cancellationResponse = this.cancellationResponse.get(); - if (cancellationResponse != null) - cancellationResponse.cancellationRequested(); - } - } + super(task); + executor.execute(this); } } diff --git a/src/test/java/com/namehillsoftware/handoff/promises/GivenAPromiseThatIsRejected/WhenCancellingTheResponse.java b/src/test/java/com/namehillsoftware/handoff/promises/GivenAPromiseThatIsRejected/WhenCancellingTheResponse.java index 00e5c17..cf6bc9f 100644 --- a/src/test/java/com/namehillsoftware/handoff/promises/GivenAPromiseThatIsRejected/WhenCancellingTheResponse.java +++ b/src/test/java/com/namehillsoftware/handoff/promises/GivenAPromiseThatIsRejected/WhenCancellingTheResponse.java @@ -21,7 +21,7 @@ public static void before() throws InterruptedException { final CountDownLatch cancellationLatch = new CountDownLatch(1); final CountDownLatch resultLatch = new CountDownLatch(1); - final Promise queuedPromise = new QueuedPromise<>(() -> { + final Promise queuedPromise = new QueuedPromise<>(cs -> { cancellationLatch.await(); throw new Exception("whoops"); }, TestExecutors.TEST_EXECUTOR); diff --git a/src/test/java/com/namehillsoftware/handoff/promises/GivenAPromiseThatResolves/AndAlwaysFinishesWithAnAction/WhenTheActionIsCancelled.java b/src/test/java/com/namehillsoftware/handoff/promises/GivenAPromiseThatResolves/AndAlwaysFinishesWithAnAction/WhenTheActionIsCancelled.java index 4fb1e28..8cd4077 100644 --- a/src/test/java/com/namehillsoftware/handoff/promises/GivenAPromiseThatResolves/AndAlwaysFinishesWithAnAction/WhenTheActionIsCancelled.java +++ b/src/test/java/com/namehillsoftware/handoff/promises/GivenAPromiseThatResolves/AndAlwaysFinishesWithAnAction/WhenTheActionIsCancelled.java @@ -24,7 +24,7 @@ public class WhenTheActionIsCancelled { public static void before() throws InterruptedException { final CountDownLatch testReadyLatch = new CountDownLatch(1); - final Promise promisedMustAction = new QueuedPromise<>(() -> { + final Promise promisedMustAction = new QueuedPromise<>(cs -> { testReadyLatch.await(); return "test"; }, TestExecutors.TEST_EXECUTOR) diff --git a/src/test/java/com/namehillsoftware/handoff/promises/GivenAPromiseThatResolves/AndInevitablyFinishesWithAnAction/AndTheActionIsCancelled/WhenTheCancellationIsCalled.java b/src/test/java/com/namehillsoftware/handoff/promises/GivenAPromiseThatResolves/AndInevitablyFinishesWithAnAction/AndTheActionIsCancelled/WhenTheCancellationIsCalled.java index 01eda4e..7450f7b 100644 --- a/src/test/java/com/namehillsoftware/handoff/promises/GivenAPromiseThatResolves/AndInevitablyFinishesWithAnAction/AndTheActionIsCancelled/WhenTheCancellationIsCalled.java +++ b/src/test/java/com/namehillsoftware/handoff/promises/GivenAPromiseThatResolves/AndInevitablyFinishesWithAnAction/AndTheActionIsCancelled/WhenTheCancellationIsCalled.java @@ -25,7 +25,7 @@ public static void before() { Promise.Rejections.setUnhandledRejectionsReceiver(rejection -> unhandledRejection = true); final CountDownLatch countDownLatch = new CountDownLatch(1); - final Promise promise = new QueuedPromise<>(() -> { + final Promise promise = new QueuedPromise<>(cs -> { if (!countDownLatch.await(10, TimeUnit.SECONDS)) throw new TimeoutException(); diff --git a/src/test/java/com/namehillsoftware/handoff/promises/GivenAPromiseThatResolves/AndTheContinuationIsCancelled/AndTheCancellationCreatesAnException/WhenTheCancellationIsCalled.java b/src/test/java/com/namehillsoftware/handoff/promises/GivenAPromiseThatResolves/AndTheContinuationIsCancelled/AndTheCancellationCreatesAnException/WhenTheCancellationIsCalled.java index 65badfc..d0ac9dd 100644 --- a/src/test/java/com/namehillsoftware/handoff/promises/GivenAPromiseThatResolves/AndTheContinuationIsCancelled/AndTheCancellationCreatesAnException/WhenTheCancellationIsCalled.java +++ b/src/test/java/com/namehillsoftware/handoff/promises/GivenAPromiseThatResolves/AndTheContinuationIsCancelled/AndTheCancellationCreatesAnException/WhenTheCancellationIsCalled.java @@ -29,7 +29,7 @@ public static void before() throws InterruptedException, TimeoutException { }); final CountDownLatch countDownLatch = new CountDownLatch(1); - final Promise promise = new QueuedPromise<>(() -> { + final Promise promise = new QueuedPromise<>(cs -> { if (!countDownLatch.await(10, TimeUnit.SECONDS)) throw new TimeoutException(); diff --git a/src/test/java/com/namehillsoftware/handoff/promises/GivenAPromiseThatResolves/WhenCancellingTheResponse.java b/src/test/java/com/namehillsoftware/handoff/promises/GivenAPromiseThatResolves/WhenCancellingTheResponse.java index dc47271..ba3ccf2 100644 --- a/src/test/java/com/namehillsoftware/handoff/promises/GivenAPromiseThatResolves/WhenCancellingTheResponse.java +++ b/src/test/java/com/namehillsoftware/handoff/promises/GivenAPromiseThatResolves/WhenCancellingTheResponse.java @@ -20,7 +20,7 @@ public static void before() throws InterruptedException { // Use latches to ensure correct order of execution. final CountDownLatch cancellationLatch = new CountDownLatch(1); final CountDownLatch resultLatch = new CountDownLatch(1); - final Promise response = new QueuedPromise<>(() -> { + final Promise response = new QueuedPromise<>(cs -> { cancellationLatch.await(); return "test"; }, TestExecutors.TEST_EXECUTOR) diff --git a/src/test/java/com/namehillsoftware/handoff/promises/queued/cancellation/GivenACancellableQueuedPromise/AndMultipleCancellationResponsesAreAttached/WhenThePromiseIsCancelledAndNoticed.java b/src/test/java/com/namehillsoftware/handoff/promises/queued/cancellation/GivenACancellableQueuedPromise/AndMultipleCancellationResponsesAreAttached/WhenThePromiseIsCancelledAndNoticed.java deleted file mode 100644 index 88bf603..0000000 --- a/src/test/java/com/namehillsoftware/handoff/promises/queued/cancellation/GivenACancellableQueuedPromise/AndMultipleCancellationResponsesAreAttached/WhenThePromiseIsCancelledAndNoticed.java +++ /dev/null @@ -1,66 +0,0 @@ -package com.namehillsoftware.handoff.promises.queued.cancellation.GivenACancellableQueuedPromise.AndMultipleCancellationResponsesAreAttached; - - -import com.namehillsoftware.handoff.promises.queued.QueuedPromise; -import com.namehillsoftware.handoff.promises.queued.cancellation.TestExecutors; -import org.junit.BeforeClass; -import org.junit.Test; - -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; - -import static org.assertj.core.api.Assertions.assertThat; - -public class WhenThePromiseIsCancelledAndNoticed { - private static final Throwable thrownException = new Exception(); - private static Throwable caughtException; - private static volatile boolean isUnexpectedCancellationCalled = false; - - @BeforeClass - public static void before() throws InterruptedException { - final CountDownLatch promiseBegunLatch = new CountDownLatch(1); - final CountDownLatch promiseLatch = new CountDownLatch(1); - final QueuedPromise cancellablePromise = new QueuedPromise<>((messenger) -> { - messenger.awaitCancellation(() -> isUnexpectedCancellationCalled = true); - messenger.awaitCancellation(() -> messenger.sendRejection(thrownException)); - - promiseBegunLatch.countDown(); - - try { - promiseLatch.await(); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - }, TestExecutors.TEST_EXECUTOR); - - final CountDownLatch rejectionLatch = new CountDownLatch(1); - cancellablePromise.then( - (r) -> { - rejectionLatch.countDown(); - return null; - }, - (exception) -> { - caughtException = exception; - rejectionLatch.countDown(); - return null; - }); - - promiseBegunLatch.await(10, TimeUnit.SECONDS); - - cancellablePromise.cancel(); - - promiseLatch.countDown(); - - rejectionLatch.await(10, TimeUnit.SECONDS); - } - - @Test - public void thenTheRejectionIsCorrect() { - assertThat(caughtException).isEqualTo(thrownException); - } - - @Test - public void thenTheUnexpectedCancellationIsNotCalled() { - assertThat(isUnexpectedCancellationCalled).isFalse(); - } -} diff --git a/src/test/java/com/namehillsoftware/handoff/promises/queued/cancellation/GivenACancellableQueuedPromise/AndMultipleCancellationResponsesAreAttached/WhenThePromiseIsCancelledImmediately.java b/src/test/java/com/namehillsoftware/handoff/promises/queued/cancellation/GivenACancellableQueuedPromise/AndMultipleCancellationResponsesAreAttached/WhenThePromiseIsCancelledImmediately.java deleted file mode 100644 index c2bb705..0000000 --- a/src/test/java/com/namehillsoftware/handoff/promises/queued/cancellation/GivenACancellableQueuedPromise/AndMultipleCancellationResponsesAreAttached/WhenThePromiseIsCancelledImmediately.java +++ /dev/null @@ -1,59 +0,0 @@ -package com.namehillsoftware.handoff.promises.queued.cancellation.GivenACancellableQueuedPromise.AndMultipleCancellationResponsesAreAttached; - - -import com.namehillsoftware.handoff.promises.queued.QueuedPromise; -import com.namehillsoftware.handoff.promises.queued.cancellation.TestExecutors; -import org.junit.BeforeClass; -import org.junit.Test; - -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; - -import static org.assertj.core.api.Assertions.assertThat; - -public class WhenThePromiseIsCancelledImmediately { - private static final Throwable thrownException = new Exception(); - private static Throwable caughtException; - private static volatile boolean isUnexpectedCancellationCalled = false; - - @BeforeClass - public static void before() throws InterruptedException { - final CountDownLatch rejectionLatch = new CountDownLatch(1); - - final QueuedPromise cancellablePromise = new QueuedPromise<>((messenger) -> { - messenger.awaitCancellation(() -> messenger.sendRejection(thrownException)); - - try { - rejectionLatch.await(); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - messenger.awaitCancellation(() -> isUnexpectedCancellationCalled = true); - }, TestExecutors.TEST_EXECUTOR); - - cancellablePromise.cancel(); - - cancellablePromise.then( - (r) -> { - rejectionLatch.countDown(); - return null; - }, - (exception) -> { - caughtException = exception; - rejectionLatch.countDown(); - return null; - }); - - rejectionLatch.await(10, TimeUnit.SECONDS); - } - - @Test - public void thenTheRejectionIsCorrect() { - assertThat(caughtException).isEqualTo(thrownException); - } - - @Test - public void thenTheUnexpectedCancellationIsNotCalled() { - assertThat(isUnexpectedCancellationCalled).isFalse(); - } -} diff --git a/src/test/java/com/namehillsoftware/handoff/promises/queued/cancellation/GivenACancellableQueuedPromise/WhenThePromiseIsCancelledImmediately.java b/src/test/java/com/namehillsoftware/handoff/promises/queued/cancellation/GivenACancellableQueuedPromise/WhenThePromiseIsCancelledImmediately.java index 798f8e7..e9107ef 100644 --- a/src/test/java/com/namehillsoftware/handoff/promises/queued/cancellation/GivenACancellableQueuedPromise/WhenThePromiseIsCancelledImmediately.java +++ b/src/test/java/com/namehillsoftware/handoff/promises/queued/cancellation/GivenACancellableQueuedPromise/WhenThePromiseIsCancelledImmediately.java @@ -17,8 +17,11 @@ public class WhenThePromiseIsCancelledImmediately { @BeforeClass public static void before() throws InterruptedException { - final QueuedPromise cancellablePromise = new QueuedPromise<>((messenger) -> { - messenger.awaitCancellation(() -> messenger.sendRejection(thrownException)); + final QueuedPromise cancellablePromise = new QueuedPromise<>(cs -> { + if (cs.isCancelled()) + throw thrownException; + + return ""; }, TestExecutors.TEST_EXECUTOR); cancellablePromise.cancel(); From 4682c3e346171d05b402d599efd11338a0a62882 Mon Sep 17 00:00:00 2001 From: David Date: Sun, 29 Dec 2024 14:01:48 -0600 Subject: [PATCH 2/2] Removed unused messenger operators --- .../promises/queued/MessageWriter.java | 5 ---- .../queued/PreparedMessengerOperator.java | 22 ----------------- .../CancellablePreparedMessengerOperator.java | 24 ------------------- 3 files changed, 51 deletions(-) delete mode 100644 src/main/java/com/namehillsoftware/handoff/promises/queued/MessageWriter.java delete mode 100644 src/main/java/com/namehillsoftware/handoff/promises/queued/PreparedMessengerOperator.java delete mode 100644 src/main/java/com/namehillsoftware/handoff/promises/queued/cancellation/CancellablePreparedMessengerOperator.java diff --git a/src/main/java/com/namehillsoftware/handoff/promises/queued/MessageWriter.java b/src/main/java/com/namehillsoftware/handoff/promises/queued/MessageWriter.java deleted file mode 100644 index 8d48e97..0000000 --- a/src/main/java/com/namehillsoftware/handoff/promises/queued/MessageWriter.java +++ /dev/null @@ -1,5 +0,0 @@ -package com.namehillsoftware.handoff.promises.queued; - -public interface MessageWriter { - Resolution prepareMessage() throws Throwable; -} \ No newline at end of file diff --git a/src/main/java/com/namehillsoftware/handoff/promises/queued/PreparedMessengerOperator.java b/src/main/java/com/namehillsoftware/handoff/promises/queued/PreparedMessengerOperator.java deleted file mode 100644 index dcafadb..0000000 --- a/src/main/java/com/namehillsoftware/handoff/promises/queued/PreparedMessengerOperator.java +++ /dev/null @@ -1,22 +0,0 @@ -package com.namehillsoftware.handoff.promises.queued; - -import com.namehillsoftware.handoff.Messenger; -import com.namehillsoftware.handoff.promises.MessengerOperator; - -public final class PreparedMessengerOperator implements MessengerOperator { - - private final MessageWriter writer; - - public PreparedMessengerOperator(MessageWriter writer) { - this.writer = writer; - } - - @Override - public void send(Messenger messenger) { - try { - messenger.sendResolution(writer.prepareMessage()); - } catch (Throwable rejection) { - messenger.sendRejection(rejection); - } - } -} diff --git a/src/main/java/com/namehillsoftware/handoff/promises/queued/cancellation/CancellablePreparedMessengerOperator.java b/src/main/java/com/namehillsoftware/handoff/promises/queued/cancellation/CancellablePreparedMessengerOperator.java deleted file mode 100644 index 0ee6c3d..0000000 --- a/src/main/java/com/namehillsoftware/handoff/promises/queued/cancellation/CancellablePreparedMessengerOperator.java +++ /dev/null @@ -1,24 +0,0 @@ -package com.namehillsoftware.handoff.promises.queued.cancellation; - -import com.namehillsoftware.handoff.Messenger; -import com.namehillsoftware.handoff.cancellation.CancellationToken; -import com.namehillsoftware.handoff.promises.MessengerOperator; - -public final class CancellablePreparedMessengerOperator implements MessengerOperator { - private final CancellableMessageWriter writer; - - public CancellablePreparedMessengerOperator(CancellableMessageWriter writer) { - this.writer = writer; - } - - @Override - public void send(Messenger messenger) { - try { - final CancellationToken cancellationToken = new CancellationToken(); - messenger.awaitCancellation(cancellationToken); - messenger.sendResolution(writer.prepareMessage(cancellationToken)); - } catch (Throwable throwable) { - messenger.sendRejection(throwable); - } - } -}