diff --git a/aeron-system-tests/src/test/java/io/aeron/BufferClaimMessageTest.java b/aeron-system-tests/src/test/java/io/aeron/BufferClaimMessageTest.java index dcbe29464f..942aea1b51 100644 --- a/aeron-system-tests/src/test/java/io/aeron/BufferClaimMessageTest.java +++ b/aeron-system-tests/src/test/java/io/aeron/BufferClaimMessageTest.java @@ -81,8 +81,8 @@ public void shouldReceivePublishedMessageWithInterleavedAbort(final String chann while (publication.tryClaim(MESSAGE_LENGTH, bufferClaim) < 0L) { - SystemTest.checkInterruptedStatus(); Thread.yield(); + SystemTest.checkInterruptedStatus(); } publishMessage(srcBuffer, publication); @@ -96,8 +96,8 @@ public void shouldReceivePublishedMessageWithInterleavedAbort(final String chann final int fragments = subscription.poll(fragmentHandler, FRAGMENT_COUNT_LIMIT); if (0 == fragments) { - SystemTest.checkInterruptedStatus(); Thread.yield(); + SystemTest.checkInterruptedStatus(); } numFragments += fragments; @@ -119,8 +119,8 @@ public void shouldTransferReservedValue(final String channel) { while (publication.tryClaim(MESSAGE_LENGTH, bufferClaim) < 0L) { - SystemTest.checkInterruptedStatus(); Thread.yield(); + SystemTest.checkInterruptedStatus(); } final long reservedValue = System.currentTimeMillis(); @@ -142,8 +142,8 @@ public void shouldTransferReservedValue(final String channel) if (0 == fragments) { - SystemTest.checkInterruptedStatus(); Thread.yield(); + SystemTest.checkInterruptedStatus(); } } } @@ -153,8 +153,8 @@ private static void publishMessage(final UnsafeBuffer srcBuffer, final Publicati { while (publication.offer(srcBuffer, 0, MESSAGE_LENGTH) < 0L) { - SystemTest.checkInterruptedStatus(); Thread.yield(); + SystemTest.checkInterruptedStatus(); } } } diff --git a/aeron-system-tests/src/test/java/io/aeron/ChannelEndpointStatusTest.java b/aeron-system-tests/src/test/java/io/aeron/ChannelEndpointStatusTest.java index 9168c85e87..b9c4e80757 100644 --- a/aeron-system-tests/src/test/java/io/aeron/ChannelEndpointStatusTest.java +++ b/aeron-system-tests/src/test/java/io/aeron/ChannelEndpointStatusTest.java @@ -131,8 +131,8 @@ public void shouldBeAbleToQueryChannelStatusForSubscription() while (subscription.channelStatus() == ChannelEndpointStatus.INITIALIZING) { - SystemTest.checkInterruptedStatus(); Thread.yield(); + SystemTest.checkInterruptedStatus(); } assertThat(subscription.channelStatus(), is(ChannelEndpointStatus.ACTIVE)); @@ -145,8 +145,8 @@ public void shouldBeAbleToQueryChannelStatusForPublication() while (publication.channelStatus() == ChannelEndpointStatus.INITIALIZING) { - SystemTest.checkInterruptedStatus(); Thread.yield(); + SystemTest.checkInterruptedStatus(); } assertThat(publication.channelStatus(), is(ChannelEndpointStatus.ACTIVE)); @@ -159,8 +159,8 @@ public void shouldCatchErrorOnAddressAlreadyInUseForSubscriptions() while (subscriptionA.channelStatus() == ChannelEndpointStatus.INITIALIZING) { - SystemTest.checkInterruptedStatus(); Thread.yield(); + SystemTest.checkInterruptedStatus(); } assertThat(subscriptionA.channelStatus(), is(ChannelEndpointStatus.ACTIVE)); @@ -189,8 +189,8 @@ public void shouldCatchErrorOnAddressAlreadyInUseForPublications() while (publicationA.channelStatus() == ChannelEndpointStatus.INITIALIZING) { - SystemTest.checkInterruptedStatus(); Thread.yield(); + SystemTest.checkInterruptedStatus(); } assertThat(publicationA.channelStatus(), is(ChannelEndpointStatus.ACTIVE)); @@ -220,8 +220,8 @@ public void shouldNotErrorOnAddressAlreadyInUseOnActiveChannelEndpointForSubscri while (subscriptionA.channelStatus() == ChannelEndpointStatus.INITIALIZING) { - SystemTest.checkInterruptedStatus(); Thread.yield(); + SystemTest.checkInterruptedStatus(); } final Subscription subscriptionB = clientB.addSubscription(URI_NO_CONFLICT, STREAM_ID); @@ -230,8 +230,8 @@ public void shouldNotErrorOnAddressAlreadyInUseOnActiveChannelEndpointForSubscri while (subscriptionB.channelStatus() == ChannelEndpointStatus.INITIALIZING || subscriptionC.channelStatus() == ChannelEndpointStatus.INITIALIZING) { - SystemTest.checkInterruptedStatus(); Thread.yield(); + SystemTest.checkInterruptedStatus(); } verify(errorHandlerClientC, timeout(5000)).onError(any(ChannelEndpointException.class)); diff --git a/aeron-system-tests/src/test/java/io/aeron/ControlledMessageTest.java b/aeron-system-tests/src/test/java/io/aeron/ControlledMessageTest.java index c169f18296..b79d2cf57c 100644 --- a/aeron-system-tests/src/test/java/io/aeron/ControlledMessageTest.java +++ b/aeron-system-tests/src/test/java/io/aeron/ControlledMessageTest.java @@ -68,8 +68,8 @@ public void shouldReceivePublishedMessage() { while (publication.offer(srcBuffer, i * PAYLOAD_LENGTH, PAYLOAD_LENGTH) < 0L) { - SystemTest.checkInterruptedStatus(); Thread.yield(); + SystemTest.checkInterruptedStatus(); } } @@ -80,8 +80,8 @@ public void shouldReceivePublishedMessage() final int fragments = subscription.controlledPoll(fragmentCollector, FRAGMENT_COUNT_LIMIT); if (0 == fragments) { - SystemTest.checkInterruptedStatus(); Thread.yield(); + SystemTest.checkInterruptedStatus(); } numFragments += fragments; } diff --git a/aeron-system-tests/src/test/java/io/aeron/CounterTest.java b/aeron-system-tests/src/test/java/io/aeron/CounterTest.java index 0e89279c7a..4284f99cbc 100644 --- a/aeron-system-tests/src/test/java/io/aeron/CounterTest.java +++ b/aeron-system-tests/src/test/java/io/aeron/CounterTest.java @@ -145,8 +145,8 @@ public void shouldCloseReadableCounterOnUnavailableCounter() while (null == readableCounter) { - SystemTest.checkInterruptedStatus(); SystemTest.sleep(1); + SystemTest.checkInterruptedStatus(); } assertFalse(readableCounter.isClosed()); @@ -156,8 +156,8 @@ public void shouldCloseReadableCounterOnUnavailableCounter() while (!readableCounter.isClosed()) { - SystemTest.checkInterruptedStatus(); SystemTest.sleep(1); + SystemTest.checkInterruptedStatus(); } } diff --git a/aeron-system-tests/src/test/java/io/aeron/ExclusivePublicationTest.java b/aeron-system-tests/src/test/java/io/aeron/ExclusivePublicationTest.java index b31c739208..a8aef343c5 100644 --- a/aeron-system-tests/src/test/java/io/aeron/ExclusivePublicationTest.java +++ b/aeron-system-tests/src/test/java/io/aeron/ExclusivePublicationTest.java @@ -74,8 +74,8 @@ public void shouldPublishFromIndependentExclusivePublications(final String chann { while (subscription.imageCount() < 2) { - SystemTest.checkInterruptedStatus(); Thread.yield(); + SystemTest.checkInterruptedStatus(); } final int expectedNumberOfFragments = 778; @@ -100,8 +100,8 @@ public void shouldPublishFromIndependentExclusivePublications(final String chann if (0 == fragmentsRead) { - SystemTest.checkInterruptedStatus(); Thread.yield(); + SystemTest.checkInterruptedStatus(); } totalFragmentsRead += fragmentsRead; @@ -116,8 +116,8 @@ private static void publishMessage(final UnsafeBuffer srcBuffer, final Exclusive { while (publication.offer(srcBuffer, 0, MESSAGE_LENGTH) < 0L) { - SystemTest.checkInterruptedStatus(); Thread.yield(); + SystemTest.checkInterruptedStatus(); } } } diff --git a/aeron-system-tests/src/test/java/io/aeron/FlowControlStrategiesTest.java b/aeron-system-tests/src/test/java/io/aeron/FlowControlStrategiesTest.java index 2c0550413c..6c1a2dcecd 100644 --- a/aeron-system-tests/src/test/java/io/aeron/FlowControlStrategiesTest.java +++ b/aeron-system-tests/src/test/java/io/aeron/FlowControlStrategiesTest.java @@ -139,16 +139,16 @@ public void shouldTimeoutImageWhenBehindForTooLongWithMaxMulticastFlowControlStr while (!subscriptionA.isConnected() || !subscriptionB.isConnected()) { - SystemTest.checkInterruptedStatus(); Thread.yield(); + SystemTest.checkInterruptedStatus(); } for (int i = 0; i < numMessagesToSend; i++) { while (publication.offer(buffer, 0, buffer.capacity()) < 0L) { - SystemTest.checkInterruptedStatus(); Thread.yield(); + SystemTest.checkInterruptedStatus(); } // A keeps up @@ -213,8 +213,8 @@ public void shouldSlowDownWhenBehindWithMinMulticastFlowControlStrategy() while (!subscriptionA.isConnected() || !subscriptionB.isConnected()) { - SystemTest.checkInterruptedStatus(); Thread.yield(); + SystemTest.checkInterruptedStatus(); } for (long i = 0; numFragmentsFromA < numMessagesToSend || numFragmentsFromB < numMessagesToSend; i++) @@ -274,8 +274,8 @@ public void shouldRemoveDeadReceiverWithMinMulticastFlowControlStrategy() while (!subscriptionA.isConnected() || !subscriptionB.isConnected()) { - SystemTest.checkInterruptedStatus(); Thread.yield(); + SystemTest.checkInterruptedStatus(); } while (numFragmentsFromA < numMessagesToSend) @@ -349,8 +349,8 @@ else if (Publication.NOT_CONNECTED == result) } } - SystemTest.checkInterruptedStatus(); Thread.yield(); + SystemTest.checkInterruptedStatus(); // A keeps up subscriptionA.poll(fragmentHandlerA, 10); @@ -401,8 +401,8 @@ public void shouldRemoveDeadPreferredReceiverWithPreferredMulticastFlowControlSt while (!subscriptionA.isConnected() || !subscriptionB.isConnected()) { - SystemTest.checkInterruptedStatus(); Thread.yield(); + SystemTest.checkInterruptedStatus(); } while (numFragmentsReadFromA < numMessagesToSend) diff --git a/aeron-system-tests/src/test/java/io/aeron/FragmentedMessageTest.java b/aeron-system-tests/src/test/java/io/aeron/FragmentedMessageTest.java index 48de4fe160..a0dc354ff0 100644 --- a/aeron-system-tests/src/test/java/io/aeron/FragmentedMessageTest.java +++ b/aeron-system-tests/src/test/java/io/aeron/FragmentedMessageTest.java @@ -88,8 +88,8 @@ public void shouldReceivePublishedMessage(final String channel) while (publication.offer(srcBuffer, offset, srcBuffer.capacity()) < 0L) { - SystemTest.checkInterruptedStatus(); Thread.yield(); + SystemTest.checkInterruptedStatus(); } final int expectedFragmentsBecauseOfHeader = 5; @@ -99,8 +99,8 @@ public void shouldReceivePublishedMessage(final String channel) final int fragments = subscription.poll(assembler, FRAGMENT_COUNT_LIMIT); if (0 == fragments) { - SystemTest.checkInterruptedStatus(); Thread.yield(); + SystemTest.checkInterruptedStatus(); } numFragments += fragments; } diff --git a/aeron-system-tests/src/test/java/io/aeron/GapFillLossTest.java b/aeron-system-tests/src/test/java/io/aeron/GapFillLossTest.java index d8beff4479..b5ef9da954 100644 --- a/aeron-system-tests/src/test/java/io/aeron/GapFillLossTest.java +++ b/aeron-system-tests/src/test/java/io/aeron/GapFillLossTest.java @@ -90,8 +90,8 @@ public void shouldGapFillWhenLossOccurs() throws Exception while ((position = publication.offer(srcBuffer)) < 0L) { - SystemTest.checkInterruptedStatus(); Thread.yield(); + SystemTest.checkInterruptedStatus(); } } @@ -117,8 +117,8 @@ public void run() { while (!subscription.isConnected()) { - SystemTest.checkInterruptedStatus(); Thread.yield(); + SystemTest.checkInterruptedStatus(); } final Image image = subscription.imageAtIndex(0); @@ -134,6 +134,7 @@ public void run() return; } } + Thread.yield(); } } diff --git a/aeron-system-tests/src/test/java/io/aeron/ImageAvailabilityTest.java b/aeron-system-tests/src/test/java/io/aeron/ImageAvailabilityTest.java index 8350d013bd..75f88c6bec 100644 --- a/aeron-system-tests/src/test/java/io/aeron/ImageAvailabilityTest.java +++ b/aeron-system-tests/src/test/java/io/aeron/ImageAvailabilityTest.java @@ -79,9 +79,9 @@ public void shouldCallImageHandlers(final String channel) { while (!subOne.isConnected() || !subTwo.isConnected() || !publication.isConnected()) { - SystemTest.checkInterruptedStatus(); - Thread.yield(); aeron.conductorAgentInvoker().invoke(); + Thread.yield(); + SystemTest.checkInterruptedStatus(); } final Image image = subOne.imageAtIndex(0); @@ -99,9 +99,9 @@ public void shouldCallImageHandlers(final String channel) while (subOne.isConnected() || subTwo.isConnected()) { - SystemTest.checkInterruptedStatus(); - Thread.yield(); aeron.conductorAgentInvoker().invoke(); + Thread.yield(); + SystemTest.checkInterruptedStatus(); } assertTrue(image.isClosed()); @@ -135,9 +135,9 @@ public void shouldCallImageHandlersWithPublisherOnDifferentClient(final String c { while (!subOne.isConnected() || !subTwo.isConnected() || !publication.isConnected()) { - SystemTest.checkInterruptedStatus(); - Thread.yield(); aeron.conductorAgentInvoker().invoke(); + Thread.yield(); + SystemTest.checkInterruptedStatus(); } final Image image = subOne.imageAtIndex(0); @@ -155,9 +155,9 @@ public void shouldCallImageHandlersWithPublisherOnDifferentClient(final String c while (subOne.isConnected() || subTwo.isConnected()) { - SystemTest.checkInterruptedStatus(); - Thread.yield(); aeron.conductorAgentInvoker().invoke(); + Thread.yield(); + SystemTest.checkInterruptedStatus(); } assertTrue(image.isClosed()); diff --git a/aeron-system-tests/src/test/java/io/aeron/MaxPositionPublicationTest.java b/aeron-system-tests/src/test/java/io/aeron/MaxPositionPublicationTest.java index 75c8a1a72c..bb8872c7ec 100644 --- a/aeron-system-tests/src/test/java/io/aeron/MaxPositionPublicationTest.java +++ b/aeron-system-tests/src/test/java/io/aeron/MaxPositionPublicationTest.java @@ -70,8 +70,8 @@ public void shouldPublishFromIndependentExclusivePublications() long resultingPosition = publication.offer(srcBuffer, 0, MESSAGE_LENGTH); while (resultingPosition < 0) { - SystemTest.checkInterruptedStatus(); Thread.yield(); + SystemTest.checkInterruptedStatus(); resultingPosition = publication.offer(srcBuffer, 0, MESSAGE_LENGTH); } diff --git a/aeron-system-tests/src/test/java/io/aeron/MultiDestinationCastTest.java b/aeron-system-tests/src/test/java/io/aeron/MultiDestinationCastTest.java index 44416ce335..e84feb3be6 100644 --- a/aeron-system-tests/src/test/java/io/aeron/MultiDestinationCastTest.java +++ b/aeron-system-tests/src/test/java/io/aeron/MultiDestinationCastTest.java @@ -124,8 +124,8 @@ public void shouldSpinUpAndShutdownWithDynamic() while (subscriptionA.hasNoImages() || subscriptionB.hasNoImages() || subscriptionC.hasNoImages()) { - SystemTest.checkInterruptedStatus(); Thread.yield(); + SystemTest.checkInterruptedStatus(); } } @@ -144,8 +144,8 @@ public void shouldSpinUpAndShutdownWithManual() while (subscriptionA.hasNoImages() || subscriptionB.hasNoImages() || subscriptionC.hasNoImages()) { - SystemTest.checkInterruptedStatus(); Thread.yield(); + SystemTest.checkInterruptedStatus(); } assertFalse(clientA.isCommandActive(correlationId)); @@ -165,16 +165,16 @@ public void shouldSendToTwoPortsWithDynamic() while (subscriptionA.hasNoImages() || subscriptionB.hasNoImages() || subscriptionC.hasNoImages()) { - SystemTest.checkInterruptedStatus(); Thread.yield(); + SystemTest.checkInterruptedStatus(); } for (int i = 0; i < numMessagesToSend; i++) { while (publication.offer(buffer, 0, buffer.capacity()) < 0L) { - SystemTest.checkInterruptedStatus(); Thread.yield(); + SystemTest.checkInterruptedStatus(); } final MutableInteger fragmentsRead = new MutableInteger(); @@ -206,16 +206,16 @@ public void shouldSendToTwoPortsWithDynamicSingleDriver() while (!subscriptionA.isConnected() || !subscriptionB.isConnected() || !subscriptionC.isConnected()) { - SystemTest.checkInterruptedStatus(); Thread.yield(); + SystemTest.checkInterruptedStatus(); } for (int i = 0; i < numMessagesToSend; i++) { while (publication.offer(buffer, 0, buffer.capacity()) < 0L) { - SystemTest.checkInterruptedStatus(); Thread.yield(); + SystemTest.checkInterruptedStatus(); } final MutableInteger fragmentsRead = new MutableInteger(); @@ -257,8 +257,8 @@ public void shouldSendToTwoPortsWithManualSingleDriver() { while (publication.offer(buffer, 0, buffer.capacity()) < 0L) { - SystemTest.checkInterruptedStatus(); Thread.yield(); + SystemTest.checkInterruptedStatus(); } final MutableInteger fragmentsRead = new MutableInteger(); @@ -294,16 +294,16 @@ public void shouldManuallyRemovePortDuringActiveStream() throws Exception while (!subscriptionA.isConnected() || !subscriptionB.isConnected()) { - SystemTest.checkInterruptedStatus(); Thread.yield(); + SystemTest.checkInterruptedStatus(); } for (int i = 0; i < numMessagesToSend; i++) { while (publication.offer(buffer, 0, buffer.capacity()) < 0L) { - SystemTest.checkInterruptedStatus(); Thread.yield(); + SystemTest.checkInterruptedStatus(); } final MutableInteger fragmentsRead = new MutableInteger(); @@ -352,16 +352,16 @@ public void shouldManuallyAddPortDuringActiveStream() throws Exception while (!subscriptionA.isConnected()) { - SystemTest.checkInterruptedStatus(); Thread.yield(); + SystemTest.checkInterruptedStatus(); } for (int i = 0; i < numMessagesToSend; i++) { while (publication.offer(buffer, 0, buffer.capacity()) < 0L) { - SystemTest.checkInterruptedStatus(); Thread.yield(); + SystemTest.checkInterruptedStatus(); } final MutableInteger fragmentsRead = new MutableInteger(); diff --git a/aeron-system-tests/src/test/java/io/aeron/MultiDestinationSubscriptionTest.java b/aeron-system-tests/src/test/java/io/aeron/MultiDestinationSubscriptionTest.java index c5617713d1..461a06caf0 100644 --- a/aeron-system-tests/src/test/java/io/aeron/MultiDestinationSubscriptionTest.java +++ b/aeron-system-tests/src/test/java/io/aeron/MultiDestinationSubscriptionTest.java @@ -155,8 +155,8 @@ public void shouldSpinUpAndShutdownWithUnicast() while (subscription.hasNoImages()) { - SystemTest.checkInterruptedStatus(); Thread.yield(); + SystemTest.checkInterruptedStatus(); } } @@ -172,8 +172,8 @@ public void shouldSpinUpAndShutdownWithMulticast() while (subscription.hasNoImages()) { - SystemTest.checkInterruptedStatus(); Thread.yield(); + SystemTest.checkInterruptedStatus(); } assertFalse(clientA.isCommandActive(correlationId)); @@ -191,8 +191,8 @@ public void shouldSpinUpAndShutdownWithDynamicMdc() while (subscription.hasNoImages()) { - SystemTest.checkInterruptedStatus(); Thread.yield(); + SystemTest.checkInterruptedStatus(); } } @@ -210,16 +210,16 @@ public void shouldSendToSingleDestinationSubscriptionWithUnicast() while (subscription.hasNoImages()) { - SystemTest.checkInterruptedStatus(); Thread.yield(); + SystemTest.checkInterruptedStatus(); } for (int i = 0; i < numMessagesToSend; i++) { while (publicationA.offer(buffer, 0, buffer.capacity()) < 0L) { - SystemTest.checkInterruptedStatus(); Thread.yield(); + SystemTest.checkInterruptedStatus(); } final MutableInteger fragmentsRead = new MutableInteger(); @@ -252,16 +252,16 @@ public void shouldSendToSingleDestinationMultipleSubscriptionsWithUnicast() while (subscription.hasNoImages()) { - SystemTest.checkInterruptedStatus(); Thread.yield(); + SystemTest.checkInterruptedStatus(); } for (int i = 0; i < numMessagesToSend; i++) { while (publicationA.offer(buffer, 0, buffer.capacity()) < 0L) { - SystemTest.checkInterruptedStatus(); Thread.yield(); + SystemTest.checkInterruptedStatus(); } final MutableInteger fragmentsRead = new MutableInteger(); @@ -289,16 +289,16 @@ public void shouldSendToSingleDestinationSubscriptionWithMulticast() while (subscription.hasNoImages()) { - SystemTest.checkInterruptedStatus(); Thread.yield(); + SystemTest.checkInterruptedStatus(); } for (int i = 0; i < numMessagesToSend; i++) { while (publicationA.offer(buffer, 0, buffer.capacity()) < 0L) { - SystemTest.checkInterruptedStatus(); Thread.yield(); + SystemTest.checkInterruptedStatus(); } final MutableInteger fragmentsRead = new MutableInteger(); @@ -322,16 +322,16 @@ public void shouldSendToSingleDestinationSubscriptionWithDynamicMdc() while (subscription.hasNoImages()) { - SystemTest.checkInterruptedStatus(); Thread.yield(); + SystemTest.checkInterruptedStatus(); } for (int i = 0; i < numMessagesToSend; i++) { while (publicationA.offer(buffer, 0, buffer.capacity()) < 0L) { - SystemTest.checkInterruptedStatus(); Thread.yield(); + SystemTest.checkInterruptedStatus(); } final MutableInteger fragmentsRead = new MutableInteger(); @@ -369,16 +369,16 @@ public void shouldSendToMultipleDestinationSubscriptionWithSameStream() while (subscription.hasNoImages()) { - SystemTest.checkInterruptedStatus(); Thread.yield(); + SystemTest.checkInterruptedStatus(); } for (int i = 0; i < numMessagesToSendForA; i++) { while (publicationA.offer(buffer, 0, buffer.capacity()) < 0L) { - SystemTest.checkInterruptedStatus(); Thread.yield(); + SystemTest.checkInterruptedStatus(); } final MutableInteger fragmentsRead = new MutableInteger(); @@ -418,8 +418,8 @@ public void shouldSendToMultipleDestinationSubscriptionWithSameStream() { while (publicationB.offer(buffer, 0, buffer.capacity()) < 0L) { - SystemTest.checkInterruptedStatus(); Thread.yield(); + SystemTest.checkInterruptedStatus(); } final MutableInteger fragmentsRead = new MutableInteger(); @@ -478,8 +478,8 @@ public void shouldMergeStreamsFromMultiplePublicationsWithSameParams() { while (publicationA.offer(buffer, 0, buffer.capacity()) < 0L) { - SystemTest.checkInterruptedStatus(); Thread.yield(); + SystemTest.checkInterruptedStatus(); } final MutableInteger fragmentsRead = new MutableInteger(); @@ -487,8 +487,8 @@ public void shouldMergeStreamsFromMultiplePublicationsWithSameParams() while (publicationB.offer(buffer, 0, buffer.capacity()) < 0L) { - SystemTest.checkInterruptedStatus(); Thread.yield(); + SystemTest.checkInterruptedStatus(); } assertThat(subscription.poll(fragmentHandler, 10), is(0)); @@ -507,8 +507,8 @@ public void shouldMergeStreamsFromMultiplePublicationsWithSameParams() while (publicationA.offer(buffer, 0, buffer.capacity()) < 0L) { - SystemTest.checkInterruptedStatus(); Thread.yield(); + SystemTest.checkInterruptedStatus(); } assertThat(subscription.poll(fragmentHandler, 10), is(0)); @@ -526,8 +526,13 @@ private void pollForFragment( () -> fragmentsRead.get() > 0, (j) -> { - fragmentsRead.value += subscription.poll(handler, 10); - Thread.yield(); + final int fragments = subscription.poll(handler, 10); + if (fragments == 0) + { + Thread.yield(); + } + + fragmentsRead.value += fragments; }, Integer.MAX_VALUE, TimeUnit.MILLISECONDS.toNanos(500)); diff --git a/aeron-system-tests/src/test/java/io/aeron/MultiDriverTest.java b/aeron-system-tests/src/test/java/io/aeron/MultiDriverTest.java index 178f751d79..541c15afda 100644 --- a/aeron-system-tests/src/test/java/io/aeron/MultiDriverTest.java +++ b/aeron-system-tests/src/test/java/io/aeron/MultiDriverTest.java @@ -116,8 +116,8 @@ public void shouldSpinUpAndShutdown() while (!subscriptionA.isConnected() && !subscriptionB.isConnected()) { - SystemTest.checkInterruptedStatus(); Thread.yield(); + SystemTest.checkInterruptedStatus(); } } @@ -136,8 +136,8 @@ public void shouldJoinExistingStreamWithLockStepSendingReceiving() throws Except { while (publication.offer(buffer, 0, buffer.capacity()) < 0L) { - SystemTest.checkInterruptedStatus(); Thread.yield(); + SystemTest.checkInterruptedStatus(); } final MutableInteger fragmentsRead = new MutableInteger(); @@ -205,8 +205,8 @@ public void shouldJoinExistingIdleStreamWithLockStepSendingReceiving() throws Ex while (!publication.isConnected() && !subscriptionA.isConnected()) { - SystemTest.checkInterruptedStatus(); Thread.yield(); + SystemTest.checkInterruptedStatus(); } final CountDownLatch newImageLatch = new CountDownLatch(1); @@ -218,8 +218,8 @@ public void shouldJoinExistingIdleStreamWithLockStepSendingReceiving() throws Ex { while (publication.offer(buffer, 0, buffer.capacity()) < 0L) { - SystemTest.checkInterruptedStatus(); Thread.yield(); + SystemTest.checkInterruptedStatus(); } final MutableInteger fragmentsRead = new MutableInteger(); diff --git a/aeron-system-tests/src/test/java/io/aeron/MultiSubscriberTest.java b/aeron-system-tests/src/test/java/io/aeron/MultiSubscriberTest.java index 8f16ef3c63..c06844eefa 100644 --- a/aeron-system-tests/src/test/java/io/aeron/MultiSubscriberTest.java +++ b/aeron-system-tests/src/test/java/io/aeron/MultiSubscriberTest.java @@ -72,26 +72,26 @@ public void shouldReceiveMessageOnSeparateSubscriptions() while (!subscriptionOne.isConnected() || !subscriptionTwo.isConnected()) { - SystemTest.checkInterruptedStatus(); Thread.yield(); + SystemTest.checkInterruptedStatus(); } while (publication.offer(srcBuffer) < 0L) { - SystemTest.checkInterruptedStatus(); Thread.yield(); + SystemTest.checkInterruptedStatus(); } while (subscriptionOne.poll(adapterOne, FRAGMENT_COUNT_LIMIT) == 0) { - SystemTest.checkInterruptedStatus(); Thread.yield(); + SystemTest.checkInterruptedStatus(); } while (subscriptionTwo.poll(adapterTwo, FRAGMENT_COUNT_LIMIT) == 0) { - SystemTest.checkInterruptedStatus(); Thread.yield(); + SystemTest.checkInterruptedStatus(); } verifyData(srcBuffer, mockFragmentHandlerOne); diff --git a/aeron-system-tests/src/test/java/io/aeron/PongTest.java b/aeron-system-tests/src/test/java/io/aeron/PongTest.java index 5864ab7ffa..31b352a591 100644 --- a/aeron-system-tests/src/test/java/io/aeron/PongTest.java +++ b/aeron-system-tests/src/test/java/io/aeron/PongTest.java @@ -93,8 +93,8 @@ public void playPingPong() while (pingPublication.offer(buffer, 0, BitUtil.SIZE_OF_INT) < 0L) { - SystemTest.checkInterruptedStatus(); Thread.yield(); + SystemTest.checkInterruptedStatus(); } final MutableInteger fragmentsRead = new MutableInteger(); @@ -136,8 +136,8 @@ public void playPingPongWithRestart() throws Exception while (pingPublication.offer(buffer, 0, BitUtil.SIZE_OF_INT) < 0L) { - SystemTest.checkInterruptedStatus(); Thread.yield(); + SystemTest.checkInterruptedStatus(); } final MutableInteger fragmentsRead = new MutableInteger(); @@ -171,8 +171,8 @@ public void playPingPongWithRestart() throws Exception // wait for disconnect to ensure we stay in lock step while (pingPublication.isConnected()) { - SystemTest.checkInterruptedStatus(); Thread.sleep(100); + SystemTest.checkInterruptedStatus(); } // restart Pong side @@ -183,8 +183,8 @@ public void playPingPongWithRestart() throws Exception while (pingPublication.offer(buffer, 0, BitUtil.SIZE_OF_INT) < 0L) { - SystemTest.checkInterruptedStatus(); Thread.yield(); + SystemTest.checkInterruptedStatus(); } SystemTest.executeUntil( @@ -221,8 +221,8 @@ public void echoPingHandler( { while (pongPublication.offer(buffer, offset, length) < 0L) { - SystemTest.checkInterruptedStatus(); Thread.yield(); + SystemTest.checkInterruptedStatus(); } } } diff --git a/aeron-system-tests/src/test/java/io/aeron/PubAndSubTest.java b/aeron-system-tests/src/test/java/io/aeron/PubAndSubTest.java index e2f1736235..df5d22db62 100644 --- a/aeron-system-tests/src/test/java/io/aeron/PubAndSubTest.java +++ b/aeron-system-tests/src/test/java/io/aeron/PubAndSubTest.java @@ -158,8 +158,8 @@ public void shouldContinueAfterBufferRollover(final String channel) { while (publication.offer(buffer, 0, messageLength) < 0L) { - SystemTest.checkInterruptedStatus(); Thread.yield(); + SystemTest.checkInterruptedStatus(); } final MutableInteger fragmentsRead = new MutableInteger(); @@ -206,8 +206,8 @@ public void shouldContinueAfterRolloverWithMinimalPaddingHeader(final String cha { while (publication.offer(buffer, 0, messageLength) < 0L) { - SystemTest.checkInterruptedStatus(); Thread.yield(); + SystemTest.checkInterruptedStatus(); } final MutableInteger fragmentsRead = new MutableInteger(); @@ -232,21 +232,22 @@ public void shouldContinueAfterRolloverWithMinimalPaddingHeader(final String cha while (publication.offer(buffer, 0, messageLength) < 0L) { Thread.yield(); + SystemTest.checkInterruptedStatus(); } } // small enough to leave room for padding that is just a header while (publication.offer(buffer, 0, lastMessageLength) < 0L) { - SystemTest.checkInterruptedStatus(); Thread.yield(); + SystemTest.checkInterruptedStatus(); } // no roll over while (publication.offer(buffer, 0, messageLength) < 0L) { - SystemTest.checkInterruptedStatus(); Thread.yield(); + SystemTest.checkInterruptedStatus(); } final MutableInteger fragmentsRead = new MutableInteger(); @@ -322,6 +323,7 @@ public void shouldReceivePublishedMessageOneForOneWithDataLoss(final String chan while (publication.offer(buffer, 0, messageLength) < 0L) { Thread.yield(); + SystemTest.checkInterruptedStatus(); } final MutableInteger mutableInteger = new MutableInteger(); @@ -390,8 +392,8 @@ public void shouldReceivePublishedMessageBatchedWithDataLoss(final String channe { while (publication.offer(buffer, 0, messageLength) < 0L) { - SystemTest.checkInterruptedStatus(); Thread.yield(); + SystemTest.checkInterruptedStatus(); } } @@ -442,8 +444,8 @@ public void shouldContinueAfterBufferRolloverBatched(final String channel) { while (publication.offer(buffer, 0, messageLength) < 0L) { - SystemTest.checkInterruptedStatus(); Thread.yield(); + SystemTest.checkInterruptedStatus(); } } @@ -515,8 +517,8 @@ public void shouldContinueAfterBufferRolloverWithPadding(final String channel) { while (publication.offer(buffer, 0, messageLength) < 0L) { - SystemTest.checkInterruptedStatus(); Thread.yield(); + SystemTest.checkInterruptedStatus(); } final MutableInteger fragmentsRead = new MutableInteger(); @@ -569,8 +571,8 @@ public void shouldContinueAfterBufferRolloverWithPaddingBatched(final String cha { while (publication.offer(buffer, 0, messageLength) < 0L) { - SystemTest.checkInterruptedStatus(); Thread.yield(); + SystemTest.checkInterruptedStatus(); } } @@ -679,16 +681,16 @@ public void shouldReceivePublishedMessageOneForOneWithReSubscription(final Strin while (!subscription.isConnected()) { - SystemTest.checkInterruptedStatus(); Thread.yield(); + SystemTest.checkInterruptedStatus(); } for (int i = 0; i < numMessagesToSendStageOne; i++) { while (publication.offer(buffer, 0, messageLength) < 0L) { - SystemTest.checkInterruptedStatus(); Thread.yield(); + SystemTest.checkInterruptedStatus(); } final MutableInteger fragmentsRead = new MutableInteger(); @@ -715,8 +717,8 @@ public void shouldReceivePublishedMessageOneForOneWithReSubscription(final Strin while (!subscription.isConnected()) { - SystemTest.checkInterruptedStatus(); Thread.yield(); + SystemTest.checkInterruptedStatus(); } assertEquals(publication.position(), subscription.imageAtIndex(0).position()); @@ -725,8 +727,8 @@ public void shouldReceivePublishedMessageOneForOneWithReSubscription(final Strin { while (publication.offer(buffer, 0, messageLength) < 0L) { - SystemTest.checkInterruptedStatus(); Thread.yield(); + SystemTest.checkInterruptedStatus(); } final MutableInteger fragmentsRead = new MutableInteger(); @@ -775,8 +777,8 @@ public void shouldFragmentExactMessageLengthsCorrectly(final String channel) { while (publication.offer(buffer, 0, messageLength) < 0L) { - SystemTest.checkInterruptedStatus(); Thread.yield(); + SystemTest.checkInterruptedStatus(); } } @@ -811,16 +813,16 @@ public void shouldNoticeDroppedSubscriber(final String channel) throws Exception while (!publication.isConnected()) { - SystemTest.checkInterruptedStatus(); Thread.sleep(1); + SystemTest.checkInterruptedStatus(); } subscription.close(); while (publication.isConnected()) { - SystemTest.checkInterruptedStatus(); Thread.yield(); + SystemTest.checkInterruptedStatus(); } } @@ -830,8 +832,8 @@ private void publishMessage() while (publication.offer(buffer, 0, SIZE_OF_INT) < 0L) { - SystemTest.checkInterruptedStatus(); Thread.yield(); + SystemTest.checkInterruptedStatus(); } } } diff --git a/aeron-system-tests/src/test/java/io/aeron/PublicationUnblockTest.java b/aeron-system-tests/src/test/java/io/aeron/PublicationUnblockTest.java index 4402eb22af..91926d1bbc 100644 --- a/aeron-system-tests/src/test/java/io/aeron/PublicationUnblockTest.java +++ b/aeron-system-tests/src/test/java/io/aeron/PublicationUnblockTest.java @@ -84,8 +84,8 @@ public void shouldUnblockNonCommittedMessage(final String channel) while (publicationOne.tryClaim(length, bufferClaim) < 0L) { - SystemTest.checkInterruptedStatus(); Thread.yield(); + SystemTest.checkInterruptedStatus(); } bufferClaim.buffer().setMemory(bufferClaim.offset(), length, (byte)65); @@ -93,20 +93,20 @@ public void shouldUnblockNonCommittedMessage(final String channel) while (publicationTwo.offer(srcBuffer, 0, length) < 0L) { - SystemTest.checkInterruptedStatus(); Thread.yield(); + SystemTest.checkInterruptedStatus(); } while (publicationOne.tryClaim(length, bufferClaim) < 0L) { - SystemTest.checkInterruptedStatus(); Thread.yield(); + SystemTest.checkInterruptedStatus(); } while (publicationTwo.offer(srcBuffer, 0, length) < 0L) { - SystemTest.checkInterruptedStatus(); Thread.yield(); + SystemTest.checkInterruptedStatus(); } final int expectedFragments = 3; @@ -116,8 +116,8 @@ public void shouldUnblockNonCommittedMessage(final String channel) final int fragments = subscription.poll(fragmentHandler, FRAGMENT_COUNT_LIMIT); if (fragments == 0) { - SystemTest.checkInterruptedStatus(); Thread.yield(); + SystemTest.checkInterruptedStatus(); } numFragments += fragments; diff --git a/aeron-system-tests/src/test/java/io/aeron/PublishFromArbitraryPositionTest.java b/aeron-system-tests/src/test/java/io/aeron/PublishFromArbitraryPositionTest.java index 3fa2f21c08..b7e3e0abb8 100644 --- a/aeron-system-tests/src/test/java/io/aeron/PublishFromArbitraryPositionTest.java +++ b/aeron-system-tests/src/test/java/io/aeron/PublishFromArbitraryPositionTest.java @@ -97,8 +97,8 @@ public void shouldPublishFromArbitraryJoinPosition() throws Exception { while (!publication.isConnected()) { - SystemTest.checkInterruptedStatus(); Thread.yield(); + SystemTest.checkInterruptedStatus(); } final Thread t = new Thread( @@ -139,8 +139,8 @@ private static void publishMessage( { while (publication.offer(buffer, 0, 1 + rnd.nextInt(MAX_MESSAGE_LENGTH - 1)) < 0L) { - SystemTest.checkInterruptedStatus(); Thread.yield(); + SystemTest.checkInterruptedStatus(); } } } diff --git a/aeron-system-tests/src/test/java/io/aeron/SessionSpecificPublicationTest.java b/aeron-system-tests/src/test/java/io/aeron/SessionSpecificPublicationTest.java index c3e3a2ed47..ac3740a24b 100644 --- a/aeron-system-tests/src/test/java/io/aeron/SessionSpecificPublicationTest.java +++ b/aeron-system-tests/src/test/java/io/aeron/SessionSpecificPublicationTest.java @@ -90,8 +90,8 @@ public void shouldNotCreateExclusivePublicationWhenSessionIdCollidesWithExisting { while (!publication.isConnected()) { - SystemTest.checkInterruptedStatus(); Thread.yield(); + SystemTest.checkInterruptedStatus(); } final int existingSessionId = publication.sessionId(); diff --git a/aeron-system-tests/src/test/java/io/aeron/SessionSpecificSubscriptionTest.java b/aeron-system-tests/src/test/java/io/aeron/SessionSpecificSubscriptionTest.java index 8247a357e9..b0a3d453d6 100644 --- a/aeron-system-tests/src/test/java/io/aeron/SessionSpecificSubscriptionTest.java +++ b/aeron-system-tests/src/test/java/io/aeron/SessionSpecificSubscriptionTest.java @@ -86,8 +86,8 @@ public void shouldSubscribeToSpecificSessionIdsAndWildcard() subscriptionTwo.imageCount() != 1 || subscriptionWildcard.imageCount() != 2) { - SystemTest.checkInterruptedStatus(); Thread.yield(); + SystemTest.checkInterruptedStatus(); } for (int i = 0; i < EXPECTED_NUMBER_OF_MESSAGES; i++) @@ -132,8 +132,8 @@ public void shouldNotSubscribeWithoutSpecificSession() { while (!publication.isConnected()) { - SystemTest.checkInterruptedStatus(); Thread.yield(); + SystemTest.checkInterruptedStatus(); } assertThat(subscription.imageCount(), is(1)); @@ -160,8 +160,8 @@ private static void publishMessage(final UnsafeBuffer buffer, final Publication { while (publication.offer(buffer, 0, MESSAGE_LENGTH) < 0L) { - SystemTest.checkInterruptedStatus(); Thread.yield(); + SystemTest.checkInterruptedStatus(); } } } diff --git a/aeron-system-tests/src/test/java/io/aeron/SpySimulatedConnectionTest.java b/aeron-system-tests/src/test/java/io/aeron/SpySimulatedConnectionTest.java index 85a933fb3c..621be0c75d 100644 --- a/aeron-system-tests/src/test/java/io/aeron/SpySimulatedConnectionTest.java +++ b/aeron-system-tests/src/test/java/io/aeron/SpySimulatedConnectionTest.java @@ -103,8 +103,8 @@ public void shouldNotSimulateConnectionWhenNotConfiguredTo(final String channel) while (!spy.isConnected()) { - SystemTest.checkInterruptedStatus(); Thread.yield(); + SystemTest.checkInterruptedStatus(); } assertFalse(publication.isConnected()); @@ -128,16 +128,16 @@ public void shouldSimulateConnectionWithNoNetworkSubscriptions(final String chan while (!spy.isConnected() || !publication.isConnected()) { - SystemTest.checkInterruptedStatus(); Thread.yield(); + SystemTest.checkInterruptedStatus(); } for (int i = 0; i < messagesToSend; i++) { while (publication.offer(buffer, 0, buffer.capacity()) < 0L) { - SystemTest.checkInterruptedStatus(); Thread.yield(); + SystemTest.checkInterruptedStatus(); } final MutableInteger fragmentsRead = new MutableInteger(); @@ -191,8 +191,8 @@ public void shouldSimulateConnectionWithSlowNetworkSubscription(final String cha } } - SystemTest.checkInterruptedStatus(); Thread.yield(); + SystemTest.checkInterruptedStatus(); fragmentsFromSpy += spy.poll(fragmentHandlerSpy, 10); @@ -265,27 +265,27 @@ private void waitUntilFullConnectivity() { while (!spy.isConnected() || !subscription.isConnected() || !publication.isConnected()) { - SystemTest.checkInterruptedStatus(); Thread.yield(); + SystemTest.checkInterruptedStatus(); } // send initial message to ensure connectivity while (publication.offer(buffer, 0, buffer.capacity()) < 0L) { - SystemTest.checkInterruptedStatus(); Thread.yield(); + SystemTest.checkInterruptedStatus(); } while (spy.poll(mock(FragmentHandler.class), 1) == 0) { - SystemTest.checkInterruptedStatus(); Thread.yield(); + SystemTest.checkInterruptedStatus(); } while (subscription.poll(mock(FragmentHandler.class), 1) == 0) { - SystemTest.checkInterruptedStatus(); Thread.yield(); + SystemTest.checkInterruptedStatus(); } } } diff --git a/aeron-system-tests/src/test/java/io/aeron/SpySubscriptionTest.java b/aeron-system-tests/src/test/java/io/aeron/SpySubscriptionTest.java index e7a7a439d6..8789b38fa9 100644 --- a/aeron-system-tests/src/test/java/io/aeron/SpySubscriptionTest.java +++ b/aeron-system-tests/src/test/java/io/aeron/SpySubscriptionTest.java @@ -87,8 +87,8 @@ public void shouldReceivePublishedMessage(final String channel) { while (publication.offer(srcBuffer, i * PAYLOAD_LENGTH, PAYLOAD_LENGTH) < 0L) { - SystemTest.checkInterruptedStatus(); Thread.yield(); + SystemTest.checkInterruptedStatus(); } } diff --git a/aeron-system-tests/src/test/java/io/aeron/StopStartSecondSubscriberTest.java b/aeron-system-tests/src/test/java/io/aeron/StopStartSecondSubscriberTest.java index eace601a7d..f77ddc2bdc 100644 --- a/aeron-system-tests/src/test/java/io/aeron/StopStartSecondSubscriberTest.java +++ b/aeron-system-tests/src/test/java/io/aeron/StopStartSecondSubscriberTest.java @@ -112,14 +112,14 @@ public void shouldReceivePublishedMessage() while (publicationOne.offer(buffer, 0, BitUtil.SIZE_OF_INT) < 0L) { - SystemTest.checkInterruptedStatus(); Thread.yield(); + SystemTest.checkInterruptedStatus(); } while (publicationTwo.offer(buffer, 0, BitUtil.SIZE_OF_INT) < 0L) { - SystemTest.checkInterruptedStatus(); Thread.yield(); + SystemTest.checkInterruptedStatus(); } final MutableInteger fragmentsRead1 = new MutableInteger(); @@ -244,7 +244,7 @@ private void shouldReceiveMessagesAfterStopStart( { while (!executor.awaitTermination(1, TimeUnit.SECONDS)) { - System.err.println("Still awaiting termination"); + System.err.println("awaiting termination"); } } catch (final InterruptedException ex) diff --git a/aeron-system-tests/src/test/java/io/aeron/TwoBufferOfferMessageTest.java b/aeron-system-tests/src/test/java/io/aeron/TwoBufferOfferMessageTest.java index 77b0541095..7c6faa0042 100644 --- a/aeron-system-tests/src/test/java/io/aeron/TwoBufferOfferMessageTest.java +++ b/aeron-system-tests/src/test/java/io/aeron/TwoBufferOfferMessageTest.java @@ -122,8 +122,8 @@ private static void publishMessage( { while (publication.offer(bufferOne, 0, bufferOne.capacity(), bufferTwo, 0, bufferTwo.capacity()) < 0L) { - SystemTest.checkInterruptedStatus(); Thread.yield(); + SystemTest.checkInterruptedStatus(); } } @@ -137,8 +137,8 @@ private void pollForMessage( final int fragments = subscription.poll(handler, FRAGMENT_COUNT_LIMIT); if (fragments == 0) { - SystemTest.checkInterruptedStatus(); Thread.yield(); + SystemTest.checkInterruptedStatus(); } } } diff --git a/aeron-system-tests/src/test/java/io/aeron/UntetheredSubscriptionTest.java b/aeron-system-tests/src/test/java/io/aeron/UntetheredSubscriptionTest.java index 4936530df3..31b53f013f 100644 --- a/aeron-system-tests/src/test/java/io/aeron/UntetheredSubscriptionTest.java +++ b/aeron-system-tests/src/test/java/io/aeron/UntetheredSubscriptionTest.java @@ -91,18 +91,18 @@ public void shouldBecomeUnavailableWhenNotKeepingUp(final String channel) { while (!tetheredSub.isConnected() || !untetheredSub.isConnected()) { - SystemTest.checkInterruptedStatus(); - Thread.yield(); aeron.conductorAgentInvoker().invoke(); + Thread.yield(); + SystemTest.checkInterruptedStatus(); } while (true) { if (publication.offer(srcBuffer) < 0) { - SystemTest.checkInterruptedStatus(); - Thread.yield(); aeron.conductorAgentInvoker().invoke(); + Thread.yield(); + SystemTest.checkInterruptedStatus(); } if (pollingUntethered && untetheredSub.poll(fragmentHandler, FRAGMENT_COUNT_LIMIT) > 0) @@ -119,9 +119,9 @@ public void shouldBecomeUnavailableWhenNotKeepingUp(final String channel) while (publication.offer(srcBuffer) < 0) { - SystemTest.checkInterruptedStatus(); - Thread.yield(); aeron.conductorAgentInvoker().invoke(); + Thread.yield(); + SystemTest.checkInterruptedStatus(); } return; @@ -152,18 +152,18 @@ public void shouldRejoinAfterResting(final String channel) { while (!tetheredSub.isConnected() || !untetheredSub.isConnected()) { - SystemTest.checkInterruptedStatus(); - Thread.yield(); aeron.conductorAgentInvoker().invoke(); + Thread.yield(); + SystemTest.checkInterruptedStatus(); } while (true) { if (publication.offer(srcBuffer) < 0) { - SystemTest.checkInterruptedStatus(); - Thread.yield(); aeron.conductorAgentInvoker().invoke(); + Thread.yield(); + SystemTest.checkInterruptedStatus(); } if (pollingUntethered && untetheredSub.poll(fragmentHandler, FRAGMENT_COUNT_LIMIT) > 0) @@ -177,9 +177,9 @@ public void shouldRejoinAfterResting(final String channel) { while (availableImageCount.get() < 2) { - SystemTest.checkInterruptedStatus(); - Thread.yield(); aeron.conductorAgentInvoker().invoke(); + Thread.yield(); + SystemTest.checkInterruptedStatus(); } return; diff --git a/aeron-system-tests/src/test/java/io/aeron/archive/ArchiveTest.java b/aeron-system-tests/src/test/java/io/aeron/archive/ArchiveTest.java index 759c6b3d81..77a1bec2e7 100644 --- a/aeron-system-tests/src/test/java/io/aeron/archive/ArchiveTest.java +++ b/aeron-system-tests/src/test/java/io/aeron/archive/ArchiveTest.java @@ -24,6 +24,7 @@ import io.aeron.driver.MediaDriver; import io.aeron.driver.ThreadingMode; import io.aeron.driver.status.SystemCounterDescriptor; +import io.aeron.logbuffer.FragmentHandler; import io.aeron.logbuffer.FrameDescriptor; import io.aeron.logbuffer.Header; import org.agrona.*; @@ -338,8 +339,8 @@ public void onStart( { if (recordingEventsAdapter.poll() == 0) { - SystemTest.checkInterruptedStatus(); Thread.yield(); + SystemTest.checkInterruptedStatus(); } } @@ -382,8 +383,8 @@ public void onStop(final long id, final long startPosition, final long stopPosit { if (recordingEventsAdapter.poll() == 0) { - SystemTest.checkInterruptedStatus(); Thread.yield(); + SystemTest.checkInterruptedStatus(); } } @@ -474,8 +475,8 @@ public void onRecordingDescriptor( { if (controlResponseAdapter.poll() == 0) { - SystemTest.checkInterruptedStatus(); Thread.yield(); + SystemTest.checkInterruptedStatus(); } } } @@ -553,8 +554,8 @@ private void publishDataToBeRecorded(final Publication publication, final int me throw new IllegalStateException("Publication not connected: result=" + result); } - SystemTest.checkInterruptedStatus(); Thread.yield(); + SystemTest.checkInterruptedStatus(); } } @@ -603,8 +604,8 @@ private void validateReplay( final int fragments = replay.poll(this::validateFragment, 10); if (0 == fragments) { - SystemTest.checkInterruptedStatus(); Thread.yield(); + SystemTest.checkInterruptedStatus(); } } @@ -622,8 +623,8 @@ private void validateArchiveFile(final int messageCount, final long recordingId) while (catalog.stopPosition(recordingId) != stopPosition) { - SystemTest.checkInterruptedStatus(); Thread.yield(); + SystemTest.checkInterruptedStatus(); } try (RecordingReader recordingReader = new RecordingReader( @@ -703,8 +704,8 @@ public void onProgress(final long recordingId0, final long startPosition, final { if (recordingEventsAdapter.poll() == 0) { - SystemTest.checkInterruptedStatus(); SystemTest.sleep(1); + SystemTest.checkInterruptedStatus(); } } } @@ -768,13 +769,14 @@ private Thread validateActiveRecordingReplay( this.messageCount = 0; remaining = totalDataLength; + final FragmentHandler fragmentHandler = this::validateFragment; while (this.messageCount < messageCount) { - final int fragments = replay.poll(this::validateFragment, 10); + final int fragments = replay.poll(fragmentHandler, 10); if (0 == fragments) { - SystemTest.checkInterruptedStatus(); Thread.yield(); + SystemTest.checkInterruptedStatus(); } } } diff --git a/aeron-system-tests/src/test/java/io/aeron/archive/Common.java b/aeron-system-tests/src/test/java/io/aeron/archive/Common.java index fcc660765e..0502b4dd87 100644 --- a/aeron-system-tests/src/test/java/io/aeron/archive/Common.java +++ b/aeron-system-tests/src/test/java/io/aeron/archive/Common.java @@ -45,8 +45,8 @@ static int awaitRecordingCounterId(final CountersReader counters, final int sess int counterId; while (NULL_VALUE == (counterId = RecordingPos.findCounterIdBySession(counters, sessionId))) { - SystemTest.checkInterruptedStatus(); Thread.yield(); + SystemTest.checkInterruptedStatus(); } return counterId; @@ -62,8 +62,8 @@ static void offer(final Publication publication, final int count, final String p while (publication.offer(buffer, 0, length) <= 0) { - SystemTest.checkInterruptedStatus(); Thread.yield(); + SystemTest.checkInterruptedStatus(); } } } @@ -78,8 +78,8 @@ static void offerToPosition(final Publication publication, final String prefix, while (publication.offer(buffer, 0, length) <= 0) { - SystemTest.checkInterruptedStatus(); Thread.yield(); + SystemTest.checkInterruptedStatus(); } } } @@ -103,8 +103,8 @@ static void consume(final Subscription subscription, final int count, final Stri { if (0 == subscription.poll(fragmentHandler, FRAGMENT_LIMIT)) { - SystemTest.checkInterruptedStatus(); Thread.yield(); + SystemTest.checkInterruptedStatus(); } } @@ -120,8 +120,8 @@ static void awaitPosition(final CountersReader counters, final int counterId, fi throw new IllegalStateException("count not active: " + counterId); } - SystemTest.checkInterruptedStatus(); Thread.yield(); + SystemTest.checkInterruptedStatus(); } } @@ -129,8 +129,8 @@ public static void pollForSignal(final RecordingSignalAdapter recordingSignalAda { while (0 == recordingSignalAdapter.poll()) { - SystemTest.checkInterruptedStatus(); Thread.yield(); + SystemTest.checkInterruptedStatus(); } } diff --git a/aeron-system-tests/src/test/java/io/aeron/archive/ExtendRecordingTest.java b/aeron-system-tests/src/test/java/io/aeron/archive/ExtendRecordingTest.java index 53b3bb5ce1..e2b28e2954 100644 --- a/aeron-system-tests/src/test/java/io/aeron/archive/ExtendRecordingTest.java +++ b/aeron-system-tests/src/test/java/io/aeron/archive/ExtendRecordingTest.java @@ -213,8 +213,8 @@ private static void offer( while (publication.offer(buffer, 0, length) <= 0) { - SystemTest.checkInterruptedStatus(); Thread.yield(); + SystemTest.checkInterruptedStatus(); } } } @@ -239,8 +239,8 @@ private static void consume( { if (0 == subscription.poll(fragmentHandler, FRAGMENT_LIMIT)) { - SystemTest.checkInterruptedStatus(); Thread.yield(); + SystemTest.checkInterruptedStatus(); } } diff --git a/aeron-system-tests/src/test/java/io/aeron/archive/ReplayMergeTest.java b/aeron-system-tests/src/test/java/io/aeron/archive/ReplayMergeTest.java index 41a9142fb0..ef82a7503e 100644 --- a/aeron-system-tests/src/test/java/io/aeron/archive/ReplayMergeTest.java +++ b/aeron-system-tests/src/test/java/io/aeron/archive/ReplayMergeTest.java @@ -190,8 +190,8 @@ public void shouldMergeFromReplayToLive() if (0 == replayMerge.poll(fragmentHandler, FRAGMENT_LIMIT)) { - SystemTest.checkInterruptedStatus(); Thread.yield(); + SystemTest.checkInterruptedStatus(); } } @@ -199,8 +199,8 @@ public void shouldMergeFromReplayToLive() { if (0 == replayMerge.poll(fragmentHandler, FRAGMENT_LIMIT)) { - SystemTest.checkInterruptedStatus(); Thread.yield(); + SystemTest.checkInterruptedStatus(); } } @@ -222,8 +222,8 @@ private void offer(final Publication publication, final int index, final String while (publication.offer(buffer, 0, length) <= 0) { - SystemTest.checkInterruptedStatus(); Thread.yield(); + SystemTest.checkInterruptedStatus(); } }