Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ sourceSets {
dependencies {
implementation fileTree(dir: 'libs', include: ['*.jar'])
testImplementation 'org.mockito:mockito-core:2.+'
testImplementation 'junit:junit:4.12'
testImplementation 'junit:junit:4.13.1'
testImplementation 'org.assertj:assertj-core:2.+'
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
package com.namehillsoftware.handoff.promises.queued;

import com.namehillsoftware.handoff.Messenger;
import com.namehillsoftware.handoff.cancellation.Cancellable;
import com.namehillsoftware.handoff.cancellation.CancellationResponse;
import com.namehillsoftware.handoff.promises.MessengerOperator;
import com.namehillsoftware.handoff.promises.Promise;
import com.namehillsoftware.handoff.promises.propagation.CancellationProxy;
import com.namehillsoftware.handoff.promises.queued.cancellation.CancellableMessageWriter;
import com.namehillsoftware.handoff.promises.queued.cancellation.CancellablePreparedMessengerOperator;

import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicReference;

public class QueuedPromise<Resolution> extends Promise<Resolution> {
public QueuedPromise(MessengerOperator<Resolution> task, Executor executor) {
Expand All @@ -24,10 +28,15 @@ public QueuedPromise(MessageWriter<Resolution> task, Executor executor) {
private static class Execution {
static final class QueuedMessengerResponse<Resolution> implements
MessengerOperator<Resolution>,
Runnable {
Messenger<Resolution>,
Runnable,
Cancellable {

private final MessengerOperator<Resolution> task;
private final Executor executor;
private final CancellationProxy cancellationProxy = new CancellationProxy();
private final AtomicReference<CancellationResponse> cancellationResponse = new AtomicReference<>();

private Messenger<Resolution> resultMessenger;

QueuedMessengerResponse(MessengerOperator<Resolution> task, Executor executor) {
Expand All @@ -37,13 +46,42 @@ static final class QueuedMessengerResponse<Resolution> implements

@Override
public void send(Messenger<Resolution> resultMessenger) {
resultMessenger.awaitCancellation(cancellationProxy);
this.resultMessenger = resultMessenger;
executor.execute(this);
}

@Override
public void sendResolution(Resolution resolution) {
resultMessenger.sendResolution(resolution);
}

@Override
public void sendRejection(Throwable error) {
resultMessenger.sendRejection(error);
}

@Override
public void awaitCancellation(CancellationResponse cancellationResponse) {
if (this.cancellationResponse.getAndSet(cancellationResponse) == null) {
cancellationProxy.doCancel(this);
}
}

@Override
public void run() {
task.send(resultMessenger);
try {
task.send(this);
} finally {
cancellationResponse.lazySet(null);
}
}

@Override
public void cancel() {
final CancellationResponse cancellationResponse = this.cancellationResponse.get();
if (cancellationResponse != null)
cancellationResponse.cancellationRequested();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package com.namehillsoftware.handoff.promises.queued.cancellation.GivenACancellableQueuedPromise.AndMultipleCancellationResponsesAreAttached;


import com.namehillsoftware.handoff.promises.queued.QueuedPromise;
import com.namehillsoftware.handoff.promises.queued.cancellation.TestExecutors;
import org.junit.BeforeClass;
import org.junit.Test;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import static org.assertj.core.api.Assertions.assertThat;

public class WhenThePromiseIsCancelledAndNoticed {
private static final Throwable thrownException = new Exception();
private static Throwable caughtException;
private static volatile boolean isUnexpectedCancellationCalled = false;

@BeforeClass
public static void before() throws InterruptedException {
final CountDownLatch promiseBegunLatch = new CountDownLatch(1);
final CountDownLatch promiseLatch = new CountDownLatch(1);
final QueuedPromise<String> cancellablePromise = new QueuedPromise<>((messenger) -> {
messenger.awaitCancellation(() -> isUnexpectedCancellationCalled = true);
messenger.awaitCancellation(() -> messenger.sendRejection(thrownException));

promiseBegunLatch.countDown();

try {
promiseLatch.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}, TestExecutors.TEST_EXECUTOR);

final CountDownLatch rejectionLatch = new CountDownLatch(1);
cancellablePromise.then(
(r) -> {
rejectionLatch.countDown();
return null;
},
(exception) -> {
caughtException = exception;
rejectionLatch.countDown();
return null;
});

promiseBegunLatch.await(10, TimeUnit.SECONDS);

cancellablePromise.cancel();

promiseLatch.countDown();

rejectionLatch.await(10, TimeUnit.SECONDS);
}

@Test
public void thenTheRejectionIsCorrect() {
assertThat(caughtException).isEqualTo(thrownException);
}

@Test
public void thenTheUnexpectedCancellationIsNotCalled() {
assertThat(isUnexpectedCancellationCalled).isFalse();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package com.namehillsoftware.handoff.promises.queued.cancellation.GivenACancellableQueuedPromise.AndMultipleCancellationResponsesAreAttached;


import com.namehillsoftware.handoff.promises.queued.QueuedPromise;
import com.namehillsoftware.handoff.promises.queued.cancellation.TestExecutors;
import org.junit.BeforeClass;
import org.junit.Test;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import static org.assertj.core.api.Assertions.assertThat;

public class WhenThePromiseIsCancelledImmediately {
private static final Throwable thrownException = new Exception();
private static Throwable caughtException;
private static volatile boolean isUnexpectedCancellationCalled = false;

@BeforeClass
public static void before() throws InterruptedException {
final CountDownLatch rejectionLatch = new CountDownLatch(1);

final QueuedPromise<String> cancellablePromise = new QueuedPromise<>((messenger) -> {
messenger.awaitCancellation(() -> messenger.sendRejection(thrownException));

try {
rejectionLatch.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
messenger.awaitCancellation(() -> isUnexpectedCancellationCalled = true);
}, TestExecutors.TEST_EXECUTOR);

cancellablePromise.cancel();

cancellablePromise.then(
(r) -> {
rejectionLatch.countDown();
return null;
},
(exception) -> {
caughtException = exception;
rejectionLatch.countDown();
return null;
});

rejectionLatch.await(10, TimeUnit.SECONDS);
}

@Test
public void thenTheRejectionIsCorrect() {
assertThat(caughtException).isEqualTo(thrownException);
}

@Test
public void thenTheUnexpectedCancellationIsNotCalled() {
assertThat(isUnexpectedCancellationCalled).isFalse();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package com.namehillsoftware.handoff.promises.queued.cancellation.GivenACancellableQueuedPromise;


import com.namehillsoftware.handoff.promises.queued.QueuedPromise;
import com.namehillsoftware.handoff.promises.queued.cancellation.TestExecutors;
import org.junit.BeforeClass;
import org.junit.Test;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import static org.assertj.core.api.Assertions.assertThat;

public class WhenThePromiseIsCancelledImmediately {
private static final Throwable thrownException = new Exception();
private static volatile Throwable caughtException;

@BeforeClass
public static void before() throws InterruptedException {
final QueuedPromise<String> cancellablePromise = new QueuedPromise<>((messenger) -> {
messenger.awaitCancellation(() -> messenger.sendRejection(thrownException));
}, TestExecutors.TEST_EXECUTOR);

cancellablePromise.cancel();

final CountDownLatch rejectionLatch = new CountDownLatch(1);
cancellablePromise.then(
(r) -> {
rejectionLatch.countDown();
return null;
},
(exception) -> {
caughtException = exception;
rejectionLatch.countDown();
return null;
});

rejectionLatch.await(10, TimeUnit.SECONDS);
}

@Test
public void thenTheRejectionIsCorrect() {
assertThat(caughtException).isEqualTo(thrownException);
}
}
Loading