From 0cbbb42ee554dfa0ab2bacd6412b8c12ecd93d92 Mon Sep 17 00:00:00 2001 From: David Date: Sat, 28 Dec 2024 10:58:44 -0600 Subject: [PATCH 1/4] Proxied cancellation in queued promises --- .../promises/queued/QueuedPromise.java | 40 ++++++++++++++++- .../WhenThePromiseIsCancelledImmediately.java | 45 +++++++++++++++++++ 2 files changed, 83 insertions(+), 2 deletions(-) create mode 100644 src/test/java/com/namehillsoftware/handoff/promises/queued/cancellation/GivenACancellableQueuedPromise/WhenThePromiseIsCancelledImmediately.java diff --git a/src/main/java/com/namehillsoftware/handoff/promises/queued/QueuedPromise.java b/src/main/java/com/namehillsoftware/handoff/promises/queued/QueuedPromise.java index 67eca0e..bf855f5 100644 --- a/src/main/java/com/namehillsoftware/handoff/promises/queued/QueuedPromise.java +++ b/src/main/java/com/namehillsoftware/handoff/promises/queued/QueuedPromise.java @@ -1,12 +1,16 @@ package com.namehillsoftware.handoff.promises.queued; import com.namehillsoftware.handoff.Messenger; +import com.namehillsoftware.handoff.cancellation.Cancellable; +import com.namehillsoftware.handoff.cancellation.CancellationResponse; import com.namehillsoftware.handoff.promises.MessengerOperator; import com.namehillsoftware.handoff.promises.Promise; +import com.namehillsoftware.handoff.promises.propagation.CancellationProxy; import com.namehillsoftware.handoff.promises.queued.cancellation.CancellableMessageWriter; import com.namehillsoftware.handoff.promises.queued.cancellation.CancellablePreparedMessengerOperator; import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicReference; public class QueuedPromise extends Promise { public QueuedPromise(MessengerOperator task, Executor executor) { @@ -24,10 +28,15 @@ public QueuedPromise(MessageWriter task, Executor executor) { private static class Execution { static final class QueuedMessengerResponse implements MessengerOperator, - Runnable { + Messenger, + Runnable, + Cancellable { private final MessengerOperator task; private final Executor executor; + private final CancellationProxy cancellationProxy = new CancellationProxy(); + private final AtomicReference cancellationResponse = new AtomicReference<>(); + private Messenger resultMessenger; QueuedMessengerResponse(MessengerOperator task, Executor executor) { @@ -37,13 +46,40 @@ static final class QueuedMessengerResponse implements @Override public void send(Messenger resultMessenger) { + resultMessenger.awaitCancellation(cancellationProxy); this.resultMessenger = resultMessenger; executor.execute(this); } + @Override + public void sendResolution(Resolution resolution) { + if (this.resultMessenger != null) + this.resultMessenger.sendResolution(resolution); + } + + @Override + public void sendRejection(Throwable error) { + if (this.resultMessenger != null) + this.resultMessenger.sendRejection(error); + } + + @Override + public void awaitCancellation(CancellationResponse cancellationResponse) { + if (this.cancellationResponse.compareAndSet(null, cancellationResponse)) { + cancellationProxy.doCancel(this); + } + } + @Override public void run() { - task.send(resultMessenger); + task.send(this); + } + + @Override + public void cancel() { + final CancellationResponse cancellationResponse = this.cancellationResponse.getAndSet(null); + if (cancellationResponse != null) + cancellationResponse.cancellationRequested(); } } } diff --git a/src/test/java/com/namehillsoftware/handoff/promises/queued/cancellation/GivenACancellableQueuedPromise/WhenThePromiseIsCancelledImmediately.java b/src/test/java/com/namehillsoftware/handoff/promises/queued/cancellation/GivenACancellableQueuedPromise/WhenThePromiseIsCancelledImmediately.java new file mode 100644 index 0000000..798f8e7 --- /dev/null +++ b/src/test/java/com/namehillsoftware/handoff/promises/queued/cancellation/GivenACancellableQueuedPromise/WhenThePromiseIsCancelledImmediately.java @@ -0,0 +1,45 @@ +package com.namehillsoftware.handoff.promises.queued.cancellation.GivenACancellableQueuedPromise; + + +import com.namehillsoftware.handoff.promises.queued.QueuedPromise; +import com.namehillsoftware.handoff.promises.queued.cancellation.TestExecutors; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import static org.assertj.core.api.Assertions.assertThat; + +public class WhenThePromiseIsCancelledImmediately { + private static final Throwable thrownException = new Exception(); + private static volatile Throwable caughtException; + + @BeforeClass + public static void before() throws InterruptedException { + final QueuedPromise cancellablePromise = new QueuedPromise<>((messenger) -> { + messenger.awaitCancellation(() -> messenger.sendRejection(thrownException)); + }, TestExecutors.TEST_EXECUTOR); + + cancellablePromise.cancel(); + + final CountDownLatch rejectionLatch = new CountDownLatch(1); + cancellablePromise.then( + (r) -> { + rejectionLatch.countDown(); + return null; + }, + (exception) -> { + caughtException = exception; + rejectionLatch.countDown(); + return null; + }); + + rejectionLatch.await(10, TimeUnit.SECONDS); + } + + @Test + public void thenTheRejectionIsCorrect() { + assertThat(caughtException).isEqualTo(thrownException); + } +} From a287f77d919eb6f9181707b72e3f617297dab57d Mon Sep 17 00:00:00 2001 From: David Date: Sat, 28 Dec 2024 11:00:25 -0600 Subject: [PATCH 2/4] Updated JUnit dependency --- build.gradle | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/build.gradle b/build.gradle index 1facdb1..adea4eb 100644 --- a/build.gradle +++ b/build.gradle @@ -74,7 +74,7 @@ sourceSets { dependencies { implementation fileTree(dir: 'libs', include: ['*.jar']) testImplementation 'org.mockito:mockito-core:2.+' - testImplementation 'junit:junit:4.12' + testImplementation 'junit:junit:4.13.1' testImplementation 'org.assertj:assertj-core:2.+' } From 76a8a7ae160377f6f1ad53851f7575d6be010dd8 Mon Sep 17 00:00:00 2001 From: David Date: Sat, 28 Dec 2024 11:46:48 -0600 Subject: [PATCH 3/4] Allowed changing queued cancellation response --- .../promises/queued/QueuedPromise.java | 11 ++-- .../WhenThePromiseIsCancelledAndNoticed.java | 66 +++++++++++++++++++ .../WhenThePromiseIsCancelledImmediately.java | 59 +++++++++++++++++ 3 files changed, 130 insertions(+), 6 deletions(-) create mode 100644 src/test/java/com/namehillsoftware/handoff/promises/queued/cancellation/GivenACancellableQueuedPromise/AndMultipleCancellationResponsesAreAttached/WhenThePromiseIsCancelledAndNoticed.java create mode 100644 src/test/java/com/namehillsoftware/handoff/promises/queued/cancellation/GivenACancellableQueuedPromise/AndMultipleCancellationResponsesAreAttached/WhenThePromiseIsCancelledImmediately.java diff --git a/src/main/java/com/namehillsoftware/handoff/promises/queued/QueuedPromise.java b/src/main/java/com/namehillsoftware/handoff/promises/queued/QueuedPromise.java index bf855f5..1dab214 100644 --- a/src/main/java/com/namehillsoftware/handoff/promises/queued/QueuedPromise.java +++ b/src/main/java/com/namehillsoftware/handoff/promises/queued/QueuedPromise.java @@ -53,19 +53,17 @@ public void send(Messenger resultMessenger) { @Override public void sendResolution(Resolution resolution) { - if (this.resultMessenger != null) - this.resultMessenger.sendResolution(resolution); + resultMessenger.sendResolution(resolution); } @Override public void sendRejection(Throwable error) { - if (this.resultMessenger != null) - this.resultMessenger.sendRejection(error); + resultMessenger.sendRejection(error); } @Override public void awaitCancellation(CancellationResponse cancellationResponse) { - if (this.cancellationResponse.compareAndSet(null, cancellationResponse)) { + if (this.cancellationResponse.getAndSet(cancellationResponse) == null) { cancellationProxy.doCancel(this); } } @@ -73,11 +71,12 @@ public void awaitCancellation(CancellationResponse cancellationResponse) { @Override public void run() { task.send(this); + cancellationResponse.lazySet(null); } @Override public void cancel() { - final CancellationResponse cancellationResponse = this.cancellationResponse.getAndSet(null); + final CancellationResponse cancellationResponse = this.cancellationResponse.get(); if (cancellationResponse != null) cancellationResponse.cancellationRequested(); } diff --git a/src/test/java/com/namehillsoftware/handoff/promises/queued/cancellation/GivenACancellableQueuedPromise/AndMultipleCancellationResponsesAreAttached/WhenThePromiseIsCancelledAndNoticed.java b/src/test/java/com/namehillsoftware/handoff/promises/queued/cancellation/GivenACancellableQueuedPromise/AndMultipleCancellationResponsesAreAttached/WhenThePromiseIsCancelledAndNoticed.java new file mode 100644 index 0000000..88bf603 --- /dev/null +++ b/src/test/java/com/namehillsoftware/handoff/promises/queued/cancellation/GivenACancellableQueuedPromise/AndMultipleCancellationResponsesAreAttached/WhenThePromiseIsCancelledAndNoticed.java @@ -0,0 +1,66 @@ +package com.namehillsoftware.handoff.promises.queued.cancellation.GivenACancellableQueuedPromise.AndMultipleCancellationResponsesAreAttached; + + +import com.namehillsoftware.handoff.promises.queued.QueuedPromise; +import com.namehillsoftware.handoff.promises.queued.cancellation.TestExecutors; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import static org.assertj.core.api.Assertions.assertThat; + +public class WhenThePromiseIsCancelledAndNoticed { + private static final Throwable thrownException = new Exception(); + private static Throwable caughtException; + private static volatile boolean isUnexpectedCancellationCalled = false; + + @BeforeClass + public static void before() throws InterruptedException { + final CountDownLatch promiseBegunLatch = new CountDownLatch(1); + final CountDownLatch promiseLatch = new CountDownLatch(1); + final QueuedPromise cancellablePromise = new QueuedPromise<>((messenger) -> { + messenger.awaitCancellation(() -> isUnexpectedCancellationCalled = true); + messenger.awaitCancellation(() -> messenger.sendRejection(thrownException)); + + promiseBegunLatch.countDown(); + + try { + promiseLatch.await(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + }, TestExecutors.TEST_EXECUTOR); + + final CountDownLatch rejectionLatch = new CountDownLatch(1); + cancellablePromise.then( + (r) -> { + rejectionLatch.countDown(); + return null; + }, + (exception) -> { + caughtException = exception; + rejectionLatch.countDown(); + return null; + }); + + promiseBegunLatch.await(10, TimeUnit.SECONDS); + + cancellablePromise.cancel(); + + promiseLatch.countDown(); + + rejectionLatch.await(10, TimeUnit.SECONDS); + } + + @Test + public void thenTheRejectionIsCorrect() { + assertThat(caughtException).isEqualTo(thrownException); + } + + @Test + public void thenTheUnexpectedCancellationIsNotCalled() { + assertThat(isUnexpectedCancellationCalled).isFalse(); + } +} diff --git a/src/test/java/com/namehillsoftware/handoff/promises/queued/cancellation/GivenACancellableQueuedPromise/AndMultipleCancellationResponsesAreAttached/WhenThePromiseIsCancelledImmediately.java b/src/test/java/com/namehillsoftware/handoff/promises/queued/cancellation/GivenACancellableQueuedPromise/AndMultipleCancellationResponsesAreAttached/WhenThePromiseIsCancelledImmediately.java new file mode 100644 index 0000000..c2bb705 --- /dev/null +++ b/src/test/java/com/namehillsoftware/handoff/promises/queued/cancellation/GivenACancellableQueuedPromise/AndMultipleCancellationResponsesAreAttached/WhenThePromiseIsCancelledImmediately.java @@ -0,0 +1,59 @@ +package com.namehillsoftware.handoff.promises.queued.cancellation.GivenACancellableQueuedPromise.AndMultipleCancellationResponsesAreAttached; + + +import com.namehillsoftware.handoff.promises.queued.QueuedPromise; +import com.namehillsoftware.handoff.promises.queued.cancellation.TestExecutors; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import static org.assertj.core.api.Assertions.assertThat; + +public class WhenThePromiseIsCancelledImmediately { + private static final Throwable thrownException = new Exception(); + private static Throwable caughtException; + private static volatile boolean isUnexpectedCancellationCalled = false; + + @BeforeClass + public static void before() throws InterruptedException { + final CountDownLatch rejectionLatch = new CountDownLatch(1); + + final QueuedPromise cancellablePromise = new QueuedPromise<>((messenger) -> { + messenger.awaitCancellation(() -> messenger.sendRejection(thrownException)); + + try { + rejectionLatch.await(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + messenger.awaitCancellation(() -> isUnexpectedCancellationCalled = true); + }, TestExecutors.TEST_EXECUTOR); + + cancellablePromise.cancel(); + + cancellablePromise.then( + (r) -> { + rejectionLatch.countDown(); + return null; + }, + (exception) -> { + caughtException = exception; + rejectionLatch.countDown(); + return null; + }); + + rejectionLatch.await(10, TimeUnit.SECONDS); + } + + @Test + public void thenTheRejectionIsCorrect() { + assertThat(caughtException).isEqualTo(thrownException); + } + + @Test + public void thenTheUnexpectedCancellationIsNotCalled() { + assertThat(isUnexpectedCancellationCalled).isFalse(); + } +} From bf32d0e98c6252034ec2097f448040d8ce15da9d Mon Sep 17 00:00:00 2001 From: David Date: Sat, 28 Dec 2024 11:53:42 -0600 Subject: [PATCH 4/4] Always set cancellation response to null --- .../handoff/promises/queued/QueuedPromise.java | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/src/main/java/com/namehillsoftware/handoff/promises/queued/QueuedPromise.java b/src/main/java/com/namehillsoftware/handoff/promises/queued/QueuedPromise.java index 1dab214..9d89a80 100644 --- a/src/main/java/com/namehillsoftware/handoff/promises/queued/QueuedPromise.java +++ b/src/main/java/com/namehillsoftware/handoff/promises/queued/QueuedPromise.java @@ -70,8 +70,11 @@ public void awaitCancellation(CancellationResponse cancellationResponse) { @Override public void run() { - task.send(this); - cancellationResponse.lazySet(null); + try { + task.send(this); + } finally { + cancellationResponse.lazySet(null); + } } @Override