diff --git a/src/checkstyle/checkstyle.xml b/src/checkstyle/checkstyle.xml index 4c4e8403..194131df 100644 --- a/src/checkstyle/checkstyle.xml +++ b/src/checkstyle/checkstyle.xml @@ -177,6 +177,10 @@ - + + + + + diff --git a/src/main/java/org/springframework/integration/aws/outbound/KplMessageHandler.java b/src/main/java/org/springframework/integration/aws/outbound/KplMessageHandler.java index 04b11027..5f31ecdd 100644 --- a/src/main/java/org/springframework/integration/aws/outbound/KplMessageHandler.java +++ b/src/main/java/org/springframework/integration/aws/outbound/KplMessageHandler.java @@ -100,9 +100,9 @@ public class KplMessageHandler extends AbstractAwsMessageHandler implement private volatile ScheduledFuture flushFuture; - private long maxRecordsInFlight = 0; + private long maxInFlightRecords = 0; - private int maxRecordInFlightsSleepDurationInMillis = 100; + private int maxInFlightRecordsDuration = 100; public KplMessageHandler(KinesisProducer kinesisProducer) { Assert.notNull(kinesisProducer, "'kinesisProducer' must not be null."); @@ -121,23 +121,31 @@ public void setConverter(Converter converter) { } /** - * When in KPL mode, the setting allows handling backpressure on the KPL native process. Setting this value would enable a sleep on the KPL Thread for the specified number of milliseconds defined in maxRecordInFlightsSleepDurationInMillis. + * When in KPL mode, the setting allows handling backpressure on the KPL native process. + * Setting this value would enable a sleep on the KPL Thread for the specified number of milliseconds defined in + * maxRecordInFlightsSleepDurationInMillis. * - * @param maxRecordsInFlight Defaulted to 0. Value of 0 indicates that Backpressure handling is not enabled. Specify a positive value to enable back pressure. + * @param maxRecordsInFlight Defaulted to 0. Value of 0 indicates that Backpressure handling is not enabled. + * Specify a positive value to enable back pressure. + * @since 3.0.9 */ - public void setMaxOutstandingRecordsInFlight(long maxRecordsInFlight) { + public void setMaxRecordsInFlight(long maxRecordsInFlight) { Assert.isTrue(maxRecordsInFlight > 0, "'maxRecordsInFlight must be greater than 0."); - this.maxRecordsInFlight = maxRecordsInFlight; + this.maxInFlightRecords = maxRecordsInFlight; } /** - * The setting allows handling backpressure on the KPL native process. Enabled when maxOutstandingRecordsCount is greater than 0. The configurations puts the KPL Thread to sleep for the specified number of milliseconds. + * The setting allows handling backpressure on the KPL native process. + * Enabled when maxOutstandingRecordsCount is greater than 0. + * The configurations puts the KPL Thread to sleep for the specified number of milliseconds. * - * @param maxRecordInFlightsSleepDurationInMillis Default is 100ms. + * @param maxInFlightRecordsDuration Default is 100ms. + * @since 3.0.9 */ - public void setMaxRecordInFlightsSleepDurationInMillis(int maxRecordInFlightsSleepDurationInMillis) { - Assert.isTrue(maxRecordInFlightsSleepDurationInMillis > 0, "'maxRecordInFlightsSleepDurationInMillis must be greater than 0."); - this.maxRecordInFlightsSleepDurationInMillis = maxRecordInFlightsSleepDurationInMillis; + public void setMaxInFlightRecordsDuration(int maxInFlightRecordsDuration) { + Assert.isTrue(maxInFlightRecordsDuration > 0, + "'maxRecordInFlightsSleepDurationInMillis must be greater than 0."); + this.maxInFlightRecordsDuration = maxInFlightRecordsDuration; } /** @@ -394,9 +402,10 @@ private void setGlueSchemaIntoUserRecordIfAny(UserRecord userRecord, Message } private CompletableFuture handleUserRecord(UserRecord userRecord) { - if (this.maxRecordsInFlight != -1 && this.kinesisProducer.getOutstandingRecordsCount() > this.maxRecordsInFlight) { + if (this.maxInFlightRecords != -1 && + this.kinesisProducer.getOutstandingRecordsCount() > this.maxInFlightRecords) { try { - Thread.sleep(this.maxRecordInFlightsSleepDurationInMillis); + Thread.sleep(this.maxInFlightRecordsDuration); } catch (InterruptedException e) { Thread.currentThread().interrupt(); @@ -439,7 +448,8 @@ private PutRecordRequest buildPutRecordRequest(Message message) { if (!StringUtils.hasText(partitionKey) && this.partitionKeyExpression != null) { partitionKey = this.partitionKeyExpression.getValue(getEvaluationContext(), message, String.class); } - Assert.state(partitionKey != null, "'partitionKey' must not be null for sending a Kinesis record. " + Assert.state(partitionKey != null, + "'partitionKey' must not be null for sending a Kinesis record. " + "Consider configuring this handler with a 'partitionKey'( or 'partitionKeyExpression') " + "or supply an 'aws_partitionKey' message header."); diff --git a/src/test/java/org/springframework/integration/aws/outbound/KplMessageHandlerTest.java b/src/test/java/org/springframework/integration/aws/outbound/KplMessageHandlerTests.java similarity index 96% rename from src/test/java/org/springframework/integration/aws/outbound/KplMessageHandlerTest.java rename to src/test/java/org/springframework/integration/aws/outbound/KplMessageHandlerTests.java index 93fe2c7e..4a0b1ebe 100644 --- a/src/test/java/org/springframework/integration/aws/outbound/KplMessageHandlerTest.java +++ b/src/test/java/org/springframework/integration/aws/outbound/KplMessageHandlerTests.java @@ -47,11 +47,11 @@ /** * @author Siddharth Jain * - * @since 4.0 + * @since 3.0.9 */ @SpringJUnitConfig @DirtiesContext -public class KplMessageHandlerTest { +public class KplMessageHandlerTests { @Autowired protected KinesisProducer kinesisProducer; @@ -92,8 +92,8 @@ void testKPLMessageHandler_raw_payload_success() { void testKPLMessageHandler_raw_payload_success_backpressure_test() { given(this.kinesisProducer.addUserRecord(any(UserRecord.class))) .willReturn(mock(ListenableFuture.class)); - this.kplMessageHandler.setMaxOutstandingRecordsInFlight(1); - this.kplMessageHandler.setMaxRecordInFlightsSleepDurationInMillis(100); + this.kplMessageHandler.setMaxRecordsInFlight(1); + this.kplMessageHandler.setMaxInFlightRecordsDuration(100); given(this.kinesisProducer.getOutstandingRecordsCount()).willReturn(2); final Message message = MessageBuilder .withPayload("message1") @@ -139,4 +139,5 @@ public MessageHandler kplMessageHandler(KinesisProducer kinesisProducer) { return kplMessageHandler; } } + }