Skip to content

Commit

Permalink
[Java] Update tests to check for interrupted after a stalling action …
Browse files Browse the repository at this point in the history
…rather than before.
  • Loading branch information
mjpt777 committed Nov 6, 2019
1 parent fe94014 commit 5abe1cd
Show file tree
Hide file tree
Showing 29 changed files with 163 additions and 153 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,8 @@ public void shouldReceivePublishedMessageWithInterleavedAbort(final String chann

while (publication.tryClaim(MESSAGE_LENGTH, bufferClaim) < 0L)
{
SystemTest.checkInterruptedStatus();
Thread.yield();
SystemTest.checkInterruptedStatus();
}

publishMessage(srcBuffer, publication);
Expand All @@ -96,8 +96,8 @@ public void shouldReceivePublishedMessageWithInterleavedAbort(final String chann
final int fragments = subscription.poll(fragmentHandler, FRAGMENT_COUNT_LIMIT);
if (0 == fragments)
{
SystemTest.checkInterruptedStatus();
Thread.yield();
SystemTest.checkInterruptedStatus();
}

numFragments += fragments;
Expand All @@ -119,8 +119,8 @@ public void shouldTransferReservedValue(final String channel)
{
while (publication.tryClaim(MESSAGE_LENGTH, bufferClaim) < 0L)
{
SystemTest.checkInterruptedStatus();
Thread.yield();
SystemTest.checkInterruptedStatus();
}

final long reservedValue = System.currentTimeMillis();
Expand All @@ -142,8 +142,8 @@ public void shouldTransferReservedValue(final String channel)

if (0 == fragments)
{
SystemTest.checkInterruptedStatus();
Thread.yield();
SystemTest.checkInterruptedStatus();
}
}
}
Expand All @@ -153,8 +153,8 @@ private static void publishMessage(final UnsafeBuffer srcBuffer, final Publicati
{
while (publication.offer(srcBuffer, 0, MESSAGE_LENGTH) < 0L)
{
SystemTest.checkInterruptedStatus();
Thread.yield();
SystemTest.checkInterruptedStatus();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,8 @@ public void shouldBeAbleToQueryChannelStatusForSubscription()

while (subscription.channelStatus() == ChannelEndpointStatus.INITIALIZING)
{
SystemTest.checkInterruptedStatus();
Thread.yield();
SystemTest.checkInterruptedStatus();
}

assertThat(subscription.channelStatus(), is(ChannelEndpointStatus.ACTIVE));
Expand All @@ -145,8 +145,8 @@ public void shouldBeAbleToQueryChannelStatusForPublication()

while (publication.channelStatus() == ChannelEndpointStatus.INITIALIZING)
{
SystemTest.checkInterruptedStatus();
Thread.yield();
SystemTest.checkInterruptedStatus();
}

assertThat(publication.channelStatus(), is(ChannelEndpointStatus.ACTIVE));
Expand All @@ -159,8 +159,8 @@ public void shouldCatchErrorOnAddressAlreadyInUseForSubscriptions()

while (subscriptionA.channelStatus() == ChannelEndpointStatus.INITIALIZING)
{
SystemTest.checkInterruptedStatus();
Thread.yield();
SystemTest.checkInterruptedStatus();
}

assertThat(subscriptionA.channelStatus(), is(ChannelEndpointStatus.ACTIVE));
Expand Down Expand Up @@ -189,8 +189,8 @@ public void shouldCatchErrorOnAddressAlreadyInUseForPublications()

while (publicationA.channelStatus() == ChannelEndpointStatus.INITIALIZING)
{
SystemTest.checkInterruptedStatus();
Thread.yield();
SystemTest.checkInterruptedStatus();
}

assertThat(publicationA.channelStatus(), is(ChannelEndpointStatus.ACTIVE));
Expand Down Expand Up @@ -220,8 +220,8 @@ public void shouldNotErrorOnAddressAlreadyInUseOnActiveChannelEndpointForSubscri

while (subscriptionA.channelStatus() == ChannelEndpointStatus.INITIALIZING)
{
SystemTest.checkInterruptedStatus();
Thread.yield();
SystemTest.checkInterruptedStatus();
}

final Subscription subscriptionB = clientB.addSubscription(URI_NO_CONFLICT, STREAM_ID);
Expand All @@ -230,8 +230,8 @@ public void shouldNotErrorOnAddressAlreadyInUseOnActiveChannelEndpointForSubscri
while (subscriptionB.channelStatus() == ChannelEndpointStatus.INITIALIZING ||
subscriptionC.channelStatus() == ChannelEndpointStatus.INITIALIZING)
{
SystemTest.checkInterruptedStatus();
Thread.yield();
SystemTest.checkInterruptedStatus();
}

verify(errorHandlerClientC, timeout(5000)).onError(any(ChannelEndpointException.class));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,8 @@ public void shouldReceivePublishedMessage()
{
while (publication.offer(srcBuffer, i * PAYLOAD_LENGTH, PAYLOAD_LENGTH) < 0L)
{
SystemTest.checkInterruptedStatus();
Thread.yield();
SystemTest.checkInterruptedStatus();
}
}

Expand All @@ -80,8 +80,8 @@ public void shouldReceivePublishedMessage()
final int fragments = subscription.controlledPoll(fragmentCollector, FRAGMENT_COUNT_LIMIT);
if (0 == fragments)
{
SystemTest.checkInterruptedStatus();
Thread.yield();
SystemTest.checkInterruptedStatus();
}
numFragments += fragments;
}
Expand Down
4 changes: 2 additions & 2 deletions aeron-system-tests/src/test/java/io/aeron/CounterTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -145,8 +145,8 @@ public void shouldCloseReadableCounterOnUnavailableCounter()

while (null == readableCounter)
{
SystemTest.checkInterruptedStatus();
SystemTest.sleep(1);
SystemTest.checkInterruptedStatus();
}

assertFalse(readableCounter.isClosed());
Expand All @@ -156,8 +156,8 @@ public void shouldCloseReadableCounterOnUnavailableCounter()

while (!readableCounter.isClosed())
{
SystemTest.checkInterruptedStatus();
SystemTest.sleep(1);
SystemTest.checkInterruptedStatus();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,8 @@ public void shouldPublishFromIndependentExclusivePublications(final String chann
{
while (subscription.imageCount() < 2)
{
SystemTest.checkInterruptedStatus();
Thread.yield();
SystemTest.checkInterruptedStatus();
}

final int expectedNumberOfFragments = 778;
Expand All @@ -100,8 +100,8 @@ public void shouldPublishFromIndependentExclusivePublications(final String chann

if (0 == fragmentsRead)
{
SystemTest.checkInterruptedStatus();
Thread.yield();
SystemTest.checkInterruptedStatus();
}

totalFragmentsRead += fragmentsRead;
Expand All @@ -116,8 +116,8 @@ private static void publishMessage(final UnsafeBuffer srcBuffer, final Exclusive
{
while (publication.offer(srcBuffer, 0, MESSAGE_LENGTH) < 0L)
{
SystemTest.checkInterruptedStatus();
Thread.yield();
SystemTest.checkInterruptedStatus();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -139,16 +139,16 @@ public void shouldTimeoutImageWhenBehindForTooLongWithMaxMulticastFlowControlStr

while (!subscriptionA.isConnected() || !subscriptionB.isConnected())
{
SystemTest.checkInterruptedStatus();
Thread.yield();
SystemTest.checkInterruptedStatus();
}

for (int i = 0; i < numMessagesToSend; i++)
{
while (publication.offer(buffer, 0, buffer.capacity()) < 0L)
{
SystemTest.checkInterruptedStatus();
Thread.yield();
SystemTest.checkInterruptedStatus();
}

// A keeps up
Expand Down Expand Up @@ -213,8 +213,8 @@ public void shouldSlowDownWhenBehindWithMinMulticastFlowControlStrategy()

while (!subscriptionA.isConnected() || !subscriptionB.isConnected())
{
SystemTest.checkInterruptedStatus();
Thread.yield();
SystemTest.checkInterruptedStatus();
}

for (long i = 0; numFragmentsFromA < numMessagesToSend || numFragmentsFromB < numMessagesToSend; i++)
Expand Down Expand Up @@ -274,8 +274,8 @@ public void shouldRemoveDeadReceiverWithMinMulticastFlowControlStrategy()

while (!subscriptionA.isConnected() || !subscriptionB.isConnected())
{
SystemTest.checkInterruptedStatus();
Thread.yield();
SystemTest.checkInterruptedStatus();
}

while (numFragmentsFromA < numMessagesToSend)
Expand Down Expand Up @@ -349,8 +349,8 @@ else if (Publication.NOT_CONNECTED == result)
}
}

SystemTest.checkInterruptedStatus();
Thread.yield();
SystemTest.checkInterruptedStatus();

// A keeps up
subscriptionA.poll(fragmentHandlerA, 10);
Expand Down Expand Up @@ -401,8 +401,8 @@ public void shouldRemoveDeadPreferredReceiverWithPreferredMulticastFlowControlSt

while (!subscriptionA.isConnected() || !subscriptionB.isConnected())
{
SystemTest.checkInterruptedStatus();
Thread.yield();
SystemTest.checkInterruptedStatus();
}

while (numFragmentsReadFromA < numMessagesToSend)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,8 @@ public void shouldReceivePublishedMessage(final String channel)

while (publication.offer(srcBuffer, offset, srcBuffer.capacity()) < 0L)
{
SystemTest.checkInterruptedStatus();
Thread.yield();
SystemTest.checkInterruptedStatus();
}

final int expectedFragmentsBecauseOfHeader = 5;
Expand All @@ -99,8 +99,8 @@ public void shouldReceivePublishedMessage(final String channel)
final int fragments = subscription.poll(assembler, FRAGMENT_COUNT_LIMIT);
if (0 == fragments)
{
SystemTest.checkInterruptedStatus();
Thread.yield();
SystemTest.checkInterruptedStatus();
}
numFragments += fragments;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,8 @@ public void shouldGapFillWhenLossOccurs() throws Exception

while ((position = publication.offer(srcBuffer)) < 0L)
{
SystemTest.checkInterruptedStatus();
Thread.yield();
SystemTest.checkInterruptedStatus();
}
}

Expand All @@ -117,8 +117,8 @@ public void run()
{
while (!subscription.isConnected())
{
SystemTest.checkInterruptedStatus();
Thread.yield();
SystemTest.checkInterruptedStatus();
}

final Image image = subscription.imageAtIndex(0);
Expand All @@ -134,6 +134,7 @@ public void run()
return;
}
}

Thread.yield();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,9 @@ public void shouldCallImageHandlers(final String channel)
{
while (!subOne.isConnected() || !subTwo.isConnected() || !publication.isConnected())
{
SystemTest.checkInterruptedStatus();
Thread.yield();
aeron.conductorAgentInvoker().invoke();
Thread.yield();
SystemTest.checkInterruptedStatus();
}

final Image image = subOne.imageAtIndex(0);
Expand All @@ -99,9 +99,9 @@ public void shouldCallImageHandlers(final String channel)

while (subOne.isConnected() || subTwo.isConnected())
{
SystemTest.checkInterruptedStatus();
Thread.yield();
aeron.conductorAgentInvoker().invoke();
Thread.yield();
SystemTest.checkInterruptedStatus();
}

assertTrue(image.isClosed());
Expand Down Expand Up @@ -135,9 +135,9 @@ public void shouldCallImageHandlersWithPublisherOnDifferentClient(final String c
{
while (!subOne.isConnected() || !subTwo.isConnected() || !publication.isConnected())
{
SystemTest.checkInterruptedStatus();
Thread.yield();
aeron.conductorAgentInvoker().invoke();
Thread.yield();
SystemTest.checkInterruptedStatus();
}

final Image image = subOne.imageAtIndex(0);
Expand All @@ -155,9 +155,9 @@ public void shouldCallImageHandlersWithPublisherOnDifferentClient(final String c

while (subOne.isConnected() || subTwo.isConnected())
{
SystemTest.checkInterruptedStatus();
Thread.yield();
aeron.conductorAgentInvoker().invoke();
Thread.yield();
SystemTest.checkInterruptedStatus();
}

assertTrue(image.isClosed());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,8 @@ public void shouldPublishFromIndependentExclusivePublications()
long resultingPosition = publication.offer(srcBuffer, 0, MESSAGE_LENGTH);
while (resultingPosition < 0)
{
SystemTest.checkInterruptedStatus();
Thread.yield();
SystemTest.checkInterruptedStatus();
resultingPosition = publication.offer(srcBuffer, 0, MESSAGE_LENGTH);
}

Expand Down
Loading

0 comments on commit 5abe1cd

Please sign in to comment.