Skip to content

Commit e032855

Browse files
authored
Merge pull request #775 from Renjuju/fix-daemon-thread-exception-handling
Move throwable exception handling up a level to prevent daemon thread death
2 parents d07192c + 0064d1e commit e032855

File tree

3 files changed

+48
-8
lines changed

3 files changed

+48
-8
lines changed

amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisher.java

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -414,11 +414,14 @@ public void run() {
414414
makeRetrievalAttempt();
415415
} catch(PositionResetException pre) {
416416
log.debug("{} : Position was reset while attempting to add item to queue.", streamAndShardId);
417+
} catch (Throwable e) {
418+
log.error("{} : Unexpected exception was thrown. This could probably be an issue or a bug." +
419+
" Please search for the exception/error online to check what is going on. If the " +
420+
"issue persists or is a recurring problem, feel free to open an issue on, " +
421+
"https://github.com/awslabs/amazon-kinesis-client.", streamAndShardId, e);
417422
} finally {
418423
resetLock.readLock().unlock();
419424
}
420-
421-
422425
}
423426
callShutdownOnStrategy();
424427
}
@@ -469,11 +472,6 @@ private void makeRetrievalAttempt() {
469472
log.error("{} : Exception thrown while fetching records from Kinesis", streamAndShardId, e);
470473
} catch (SdkException e) {
471474
log.error("{} : Exception thrown while fetching records from Kinesis", streamAndShardId, e);
472-
} catch (Throwable e) {
473-
log.error("{} : Unexpected exception was thrown. This could probably be an issue or a bug." +
474-
" Please search for the exception/error online to check what is going on. If the " +
475-
"issue persists or is a recurring problem, feel free to open an issue on, " +
476-
"https://github.com/awslabs/amazon-kinesis-client.", streamAndShardId, e);
477475
} finally {
478476
MetricsUtil.endScope(scope);
479477
}

amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisherIntegrationTest.java

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,13 @@
2424
import static org.mockito.Matchers.anyLong;
2525
import static org.mockito.Matchers.eq;
2626
import static org.mockito.Mockito.doNothing;
27+
import static org.mockito.Mockito.doThrow;
2728
import static org.mockito.Mockito.mock;
2829
import static org.mockito.Mockito.spy;
30+
import static org.mockito.Mockito.times;
2931
import static org.mockito.Mockito.verify;
3032
import static org.mockito.Mockito.when;
33+
import static software.amazon.kinesis.utils.BlockingUtils.blockUntilConditionSatisfied;
3134
import static software.amazon.kinesis.utils.BlockingUtils.blockUntilRecordsAvailable;
3235

3336
import java.util.ArrayList;
@@ -38,6 +41,7 @@
3841
import java.util.concurrent.TimeUnit;
3942

4043
import org.junit.After;
44+
import org.junit.Assert;
4145
import org.junit.Before;
4246
import org.junit.Ignore;
4347
import org.junit.Test;
@@ -49,7 +53,6 @@
4953
import org.mockito.stubbing.Answer;
5054

5155
import software.amazon.awssdk.core.SdkBytes;
52-
import software.amazon.awssdk.services.kinesis.model.GetRecordsRequest;
5356
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
5457

5558
import lombok.extern.slf4j.Slf4j;
@@ -224,6 +227,25 @@ public DataFetcherResult answer(final InvocationOnMock invocationOnMock) throws
224227
verify(dataFetcher).restartIterator();
225228
}
226229

230+
@Test
231+
public void testExpiredIteratorExceptionWithInnerRestartIteratorException() {
232+
when(dataFetcher.getRecords())
233+
.thenThrow(ExpiredIteratorException.builder().message("ExpiredIterator").build())
234+
.thenCallRealMethod()
235+
.thenThrow(ExpiredIteratorException.builder().message("ExpiredIterator").build())
236+
.thenCallRealMethod();
237+
238+
doThrow(IllegalStateException.class).when(dataFetcher).restartIterator();
239+
240+
getRecordsCache.start(extendedSequenceNumber, initialPosition);
241+
242+
final boolean conditionSatisfied = blockUntilConditionSatisfied(() ->
243+
getRecordsCache.getPublisherSession().prefetchRecordsQueue().size() == MAX_SIZE, 5000);
244+
Assert.assertTrue(conditionSatisfied);
245+
// Asserts the exception was only thrown once for restartIterator
246+
verify(dataFetcher, times(2)).restartIterator();
247+
}
248+
227249
private RecordsRetrieved evictPublishedEvent(PrefetchRecordsPublisher publisher, String shardId) {
228250
return publisher.getPublisherSession().evictPublishedRecordAndUpdateDemand(shardId);
229251
}

amazon-kinesis-client/src/test/java/software/amazon/kinesis/retrieval/polling/PrefetchRecordsPublisherTest.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -448,6 +448,26 @@ public void testExpiredIteratorException() {
448448
verify(dataFetcher).restartIterator();
449449
}
450450

451+
@Test
452+
public void testExpiredIteratorExceptionWithIllegalStateException() {
453+
// This test validates that the daemon thread doesn't die when ExpiredIteratorException occurs with an
454+
// IllegalStateException.
455+
456+
when(getRecordsRetrievalStrategy.getRecords(MAX_RECORDS_PER_CALL))
457+
.thenThrow(ExpiredIteratorException.builder().build())
458+
.thenReturn(getRecordsResponse)
459+
.thenThrow(ExpiredIteratorException.builder().build())
460+
.thenReturn(getRecordsResponse);
461+
462+
doThrow(new IllegalStateException()).when(dataFetcher).restartIterator();
463+
464+
getRecordsCache.start(sequenceNumber, initialPosition);
465+
blockUntilConditionSatisfied(() -> getRecordsCache.getPublisherSession().prefetchRecordsQueue().size() == MAX_SIZE, 300);
466+
467+
// verify restartIterator was called
468+
verify(dataFetcher, times(2)).restartIterator();
469+
}
470+
451471
@Test
452472
public void testRetryableRetrievalExceptionContinues() {
453473

0 commit comments

Comments
 (0)