From b6268187f5216b4ed02430acb876a84266a32dc1 Mon Sep 17 00:00:00 2001 From: Adam Gray Date: Fri, 27 Jan 2017 15:57:16 +0400 Subject: [PATCH 1/5] bump aws sdk version to 1.11.86 --- pom.xml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pom.xml b/pom.xml index 72c4e63..d1c1375 100644 --- a/pom.xml +++ b/pom.xml @@ -23,8 +23,8 @@ - 1.7.2 - 1.11.14 + 1.7.3 + 1.11.86 1.2.1 2.6.6 From fc54ff617e3d610ad883e6930fe07f6b59edf62d Mon Sep 17 00:00:00 2001 From: Adam Gray Date: Fri, 27 Jan 2017 16:05:17 +0400 Subject: [PATCH 2/5] bump version to 1.3.1-SNAPSHOT --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index d1c1375..78063ca 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ amazon-kinesis-connectors jar Amazon Kinesis Connector Library - 1.3.0 + 1.3.1-SNAPSHOT The Amazon Kinesis Connector Library helps Java developers integrate Amazon Kinesis with other AWS and non-AWS services. https://aws.amazon.com/kinesis From c20bc5a7bd39c9aedd6caf3d77a9486d0dd8882c Mon Sep 17 00:00:00 2001 From: Adam Gray Date: Wed, 1 Feb 2017 19:46:45 +0400 Subject: [PATCH 3/5] add Kinesis Firehose connector --- .../KinesisConnectorConfiguration.java | 14 ++ .../connectors/firehose/FirehoseEmitter.java | 105 +++++++++ .../firehose/FirehoseTransformer.java | 19 ++ .../connectors/firehose/RecordBatcher.java | 61 +++++ .../firehose/FirehoseEmitterTest.java | 210 ++++++++++++++++++ .../firehose/RecordBatcherTest.java | 80 +++++++ 6 files changed, 489 insertions(+) create mode 100644 src/main/java/com/amazonaws/services/kinesis/connectors/firehose/FirehoseEmitter.java create mode 100644 src/main/java/com/amazonaws/services/kinesis/connectors/firehose/FirehoseTransformer.java create mode 100644 src/main/java/com/amazonaws/services/kinesis/connectors/firehose/RecordBatcher.java create mode 100644 src/test/java/com/amazonaws/services/kinesis/connectors/firehose/FirehoseEmitterTest.java create mode 100644 src/test/java/com/amazonaws/services/kinesis/connectors/firehose/RecordBatcherTest.java diff --git a/src/main/java/com/amazonaws/services/kinesis/connectors/KinesisConnectorConfiguration.java b/src/main/java/com/amazonaws/services/kinesis/connectors/KinesisConnectorConfiguration.java index f0568b0..d29b726 100644 --- a/src/main/java/com/amazonaws/services/kinesis/connectors/KinesisConnectorConfiguration.java +++ b/src/main/java/com/amazonaws/services/kinesis/connectors/KinesisConnectorConfiguration.java @@ -102,6 +102,8 @@ public class KinesisConnectorConfiguration { "elasticsearchCloudFormationSSHLocation"; public static final String PROP_ELASTICSEARCH_CLOUDFORMATION_CLUSTER_SIZE = "elasticsearchCloudFormationClusterSize"; + public static final String PROP_FIREHOSE_STREAM_NAME = "streamName"; + public static final String PROP_FIREHOSE_STREAM_REGION = "streamRegion"; // Default Connector App Constants public static final String DEFAULT_APP_NAME = "KinesisConnector"; @@ -157,6 +159,10 @@ public class KinesisConnectorConfiguration { public static final long DEFAULT_CLOUDWATCH_BUFFER_TIME = 10 * 1000L; public static final int DEFAULT_CLOUDWATCH_MAX_QUEUE_SIZE = 10000; + // Default Amazon Kinesis Firehose Constants + public static final String DEFAULT_FIREHOSE_STREAM_NAME = "OutputStream"; + public static final String DEFAULT_FIREHOSE_STREAM_REGION = "eu-west-1"; + // Default Amazon Elasticsearch Constraints public static final String DEFAULT_ELASTICSEARCH_CLUSTER_NAME = "elasticsearch"; public static final String DEFAULT_ELASTICSEARCH_ENDPOINT = "localhost"; @@ -236,6 +242,8 @@ public class KinesisConnectorConfiguration { public final String ELASTICSEARCH_CLOUDFORMATION_CLUSTER_INSTANCE_TYPE; public final String ELASTICSEARCH_CLOUDFORMATION_SSH_LOCATION; public final String ELASTICSEARCH_CLOUDFORMATION_CLUSTER_SIZE; + public final String FIREHOSE_STREAM_NAME; + public final String FIREHOSE_STREAM_REGION; /** * Configure the connector application with any set of properties that are unique to the application. Any @@ -349,6 +357,12 @@ public KinesisConnectorConfiguration(Properties properties, AWSCredentialsProvid ELASTICSEARCH_CLOUDFORMATION_CLUSTER_SIZE = properties.getProperty(PROP_ELASTICSEARCH_CLOUDFORMATION_CLUSTER_SIZE, DEFAULT_ELASTICSEARCH_CLOUDFORMATION_CLUSTER_SIZE); + FIREHOSE_STREAM_NAME = + properties.getProperty(PROP_FIREHOSE_STREAM_NAME, + DEFAULT_FIREHOSE_STREAM_NAME); + FIREHOSE_STREAM_REGION = + properties.getProperty(PROP_FIREHOSE_STREAM_REGION, + DEFAULT_FIREHOSE_STREAM_REGION); // Amazon Kinesis Client Library configuration WORKER_ID = properties.getProperty(PROP_WORKER_ID, DEFAULT_WORKER_ID); diff --git a/src/main/java/com/amazonaws/services/kinesis/connectors/firehose/FirehoseEmitter.java b/src/main/java/com/amazonaws/services/kinesis/connectors/firehose/FirehoseEmitter.java new file mode 100644 index 0000000..3a75114 --- /dev/null +++ b/src/main/java/com/amazonaws/services/kinesis/connectors/firehose/FirehoseEmitter.java @@ -0,0 +1,105 @@ +package com.amazonaws.services.kinesis.connectors.firehose; + +import com.amazonaws.services.kinesis.connectors.KinesisConnectorConfiguration; +import com.amazonaws.services.kinesis.connectors.UnmodifiableBuffer; +import com.amazonaws.services.kinesis.connectors.interfaces.IEmitter; +import com.amazonaws.services.kinesisfirehose.AmazonKinesisFirehoseAsync; +import com.amazonaws.services.kinesisfirehose.AmazonKinesisFirehoseAsyncClient; +import com.amazonaws.services.kinesisfirehose.AmazonKinesisFirehoseAsyncClientBuilder; +import com.amazonaws.services.kinesisfirehose.model.PutRecordBatchRequest; +import com.amazonaws.services.kinesisfirehose.model.PutRecordBatchResponseEntry; +import com.amazonaws.services.kinesisfirehose.model.PutRecordBatchResult; +import com.amazonaws.services.kinesisfirehose.model.Record; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; + +/** + * Emitter to connect a Kinesis Stream to a Kinesis Firehose stream + */ +public class FirehoseEmitter implements IEmitter { + private static final Log LOG = LogFactory.getLog(FirehoseEmitter.class); + + private final AmazonKinesisFirehoseAsync firehoseClient; + private final String streamName; + private final String streamRegion; + + public FirehoseEmitter(KinesisConnectorConfiguration configuration) { + firehoseClient = AmazonKinesisFirehoseAsyncClientBuilder + .defaultClient(); + + streamName = configuration.FIREHOSE_STREAM_NAME; + streamRegion = configuration.FIREHOSE_STREAM_REGION; + } + + /** + * Emit a record buffer + * @param buffer + * The full buffer of records + * @return List A list of any records that were not accepted by Kinesis Firehose + * @throws IOException + */ + public List emit(UnmodifiableBuffer buffer) throws IOException { + + List records = buffer.getRecords(); + + List> batches = new RecordBatcher().makeBatches(records); + + ArrayList> resultFutures = new ArrayList<>(); + + Integer i = 0; + for (List batch : batches) { + LOG.info(String.format("Writing %d records to firehose stream (batch %d of %d)", batch.size(), ++i, batches.size())); + + resultFutures.add(firehoseClient.putRecordBatchAsync(new PutRecordBatchRequest() + .withRecords(batch) + .withDeliveryStreamName(streamName))); + } + + List failures = new ArrayList<>(); + + for (i = 0; i < resultFutures.size(); i++) { + try { + PutRecordBatchResult res = resultFutures.get(i).get(); + + if (res.getFailedPutCount() > 0) { + List entries = res.getRequestResponses(); + + for (Integer j = 0; j < entries.size(); j++) { + PutRecordBatchResponseEntry entry = entries.get(j); + if (entry.getErrorCode() != null) { + failures.add(batches.get(i).get(j)); + } + } + } + } catch (InterruptedException | ExecutionException e) { + LOG.warn(String.format("Caught %s while putting batch %d, %d records will be failed", + e.getClass().getName(), + i, batches.get(i).size())); + LOG.warn("Exception details: ", e); + + // This whole batch of records to the failure list + failures.addAll(batches.get(i)); + } + } + + if (!failures.isEmpty()) { + LOG.warn(String.format("%d records failed", failures.size())); + } + return failures; + } + + public void fail(List records) { + for (Record record : records) { + LOG.error("Record failed: " + record); + } + } + + public void shutdown() {} +} diff --git a/src/main/java/com/amazonaws/services/kinesis/connectors/firehose/FirehoseTransformer.java b/src/main/java/com/amazonaws/services/kinesis/connectors/firehose/FirehoseTransformer.java new file mode 100644 index 0000000..78b1162 --- /dev/null +++ b/src/main/java/com/amazonaws/services/kinesis/connectors/firehose/FirehoseTransformer.java @@ -0,0 +1,19 @@ +package com.amazonaws.services.kinesis.connectors.firehose; + +import com.amazonaws.services.kinesis.connectors.interfaces.ITransformer; +import com.amazonaws.services.kinesis.connectors.interfaces.ITransformerBase; +import com.amazonaws.services.kinesisfirehose.model.Record; + +import java.io.IOException; + +/** + * Transformation class to be used with the FirehoseEmitter + * + * @param + */ +public abstract class FirehoseTransformer implements ITransformerBase { + + @Override + public abstract Record fromClass(T record) throws IOException; + +} diff --git a/src/main/java/com/amazonaws/services/kinesis/connectors/firehose/RecordBatcher.java b/src/main/java/com/amazonaws/services/kinesis/connectors/firehose/RecordBatcher.java new file mode 100644 index 0000000..47a736b --- /dev/null +++ b/src/main/java/com/amazonaws/services/kinesis/connectors/firehose/RecordBatcher.java @@ -0,0 +1,61 @@ +package com.amazonaws.services.kinesis.connectors.firehose; + +import com.amazonaws.services.kinesis.connectors.elasticsearch.ElasticsearchEmitter; +import com.amazonaws.services.kinesisfirehose.model.Record; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +/** + * Utility class to split a buffer into record batches to batches of Kinesis Firehose records of + * <= 500 records and <= 4MB in size, as per the Firehose API limits. + */ + +class RecordBatcher { + + private static final Log LOG = LogFactory.getLog(ElasticsearchEmitter.class); + + List> makeBatches(List records) { + // N.B. the case where a single record is over the byte-size limit is not handled + // here; it will be rejected by the Firehose API and classed as failed when it is + // emitted. + + Integer numRecords = 0; + Integer byteSize = 0; + List curBatch = new ArrayList<>(); + + List> outRecords = new ArrayList<>(); + + for (Record record : records) { + + Integer recSize = record.getData().remaining(); + + if (numRecords + 1 > 500 || byteSize + recSize > 4194304) { + // add current batch to the list and start a new one + outRecords.add(Collections.unmodifiableList(curBatch)); + + curBatch = new ArrayList<>(); + + numRecords = 0; + byteSize = 0; + } + + curBatch.add(record); + + numRecords++; + byteSize += recSize; + } + + if (curBatch.size() > 0) { + outRecords.add(curBatch); + } + + LOG.debug(String.format("Built %d batches", outRecords.size())); + + return Collections.unmodifiableList(outRecords); + } + +} diff --git a/src/test/java/com/amazonaws/services/kinesis/connectors/firehose/FirehoseEmitterTest.java b/src/test/java/com/amazonaws/services/kinesis/connectors/firehose/FirehoseEmitterTest.java new file mode 100644 index 0000000..827edbb --- /dev/null +++ b/src/test/java/com/amazonaws/services/kinesis/connectors/firehose/FirehoseEmitterTest.java @@ -0,0 +1,210 @@ +package com.amazonaws.services.kinesis.connectors.firehose; + +import com.amazonaws.auth.AWSCredentialsProvider; +import com.amazonaws.services.kinesis.connectors.KinesisConnectorConfiguration; +import com.amazonaws.services.kinesis.connectors.UnmodifiableBuffer; +import com.amazonaws.services.kinesisfirehose.AmazonKinesisFirehoseAsync; +import com.amazonaws.services.kinesisfirehose.model.*; +import org.easymock.EasyMockSupport; +import org.easymock.IExpectationSetters; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.lang.reflect.Field; +import java.nio.ByteBuffer; +import java.util.*; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.FutureTask; + +import static org.easymock.EasyMock.*; + + +public class FirehoseEmitterTest { + + private FirehoseEmitter emitter; + + private AmazonKinesisFirehoseAsync firehoseClientMock; + + private UnmodifiableBuffer buffer = createMock(UnmodifiableBuffer.class); + + @Before + public void setUp() { + Properties props = new Properties(); + AWSCredentialsProvider creds = createMock(AWSCredentialsProvider.class); + + KinesisConnectorConfiguration configuration = new KinesisConnectorConfiguration(props, creds); + emitter = new FirehoseEmitter(configuration); + + firehoseClientMock = createMock(AmazonKinesisFirehoseAsync.class); + + setField(emitter, "firehoseClient", firehoseClientMock); + setField(emitter, "streamName", "TestStream"); + } + + private void setField(Object target, String fieldName, Object value) { + try { + Class clazz = target.getClass(); + Field field = clazz.getDeclaredField(fieldName); + field.setAccessible(true); + field.set(target, value); + } catch (Exception ex) { + throw new RuntimeException(ex); + } + } + + private List generateRecords(Integer n) { + List records = new ArrayList<>(); + + for (Integer i = 0; i < n; i++) { + records.add(new Record() + .withData(ByteBuffer.wrap(i.toString().getBytes()))); + } + + return Collections.unmodifiableList(records); + } + + private IExpectationSetters> expectBatchRequestWith(PutRecordBatchRequest req) { + return expect(firehoseClientMock.putRecordBatchAsync(req)); + } + + private IExpectationSetters> expectBatchRequestWith(List records) { + return expectBatchRequestWith(new PutRecordBatchRequest() + .withRecords(records) + .withDeliveryStreamName("TestStream")); + } + + private Future mockFutureOf(PutRecordBatchResult resp) throws InterruptedException, ExecutionException { + Future f = createMock(Future.class); + expect(f.get()).andReturn(resp); + + replay(f); + return f; + } + + @Test + public void testNoRecordsCase() throws IOException { + expect(buffer.getRecords()).andReturn(new ArrayList()); + + replay(firehoseClientMock, buffer); + + List failures = emitter.emit(buffer); + Assert.assertTrue(failures.isEmpty()); + + verify(firehoseClientMock, buffer); + } + + @Test + public void testWorkingCaseNoFailures() throws IOException, InterruptedException, ExecutionException { + List records = generateRecords(1750); + List> batches = new RecordBatcher().makeBatches(records); + + Future mockFuture = createMock(Future.class); + + expect(buffer.getRecords()).andReturn(records); + + for (List batch : batches) { + + expectBatchRequestWith(batch) + .andReturn(mockFuture); + + expect(mockFuture.get()).andReturn(new PutRecordBatchResult().withFailedPutCount(0)); + + } + + replay(firehoseClientMock, buffer, mockFuture); + + List failures = emitter.emit(buffer); + + verify(firehoseClientMock, buffer, mockFuture); + + Assert.assertTrue(failures.isEmpty()); + } + + @Test + public void testRecordLevelFailureCase() throws IOException, ExecutionException, InterruptedException { + + List records = generateRecords(1750); + expect(buffer.getRecords()).andReturn(records); + + List> batches = new RecordBatcher().makeBatches(records); + + // pick a random record to fail + Integer failedRecordNum = new Random().nextInt(1750); + Integer failedRecordBatch = (int)(failedRecordNum.doubleValue()/500.0); + + for (Integer i = 0; i < batches.size(); i++) { + + List batch = batches.get(i); + + // Prepare PutRecordResponseEntry set + List entries = new ArrayList<>(); + Integer failIndex = failedRecordNum - failedRecordBatch * 500; // the record idx in this batch + + for (Integer j = 0; j < batch.size(); j++) { + if (i.equals(failedRecordBatch) && j.equals(failIndex)) { + entries.add(new PutRecordBatchResponseEntry() + .withErrorCode("error") + .withErrorMessage("uh oh spagetti-o")); + } else { + entries.add(new PutRecordBatchResponseEntry()); + } + } + + Future resultFuture = createMock(Future.class); + + PutRecordBatchResult res = new PutRecordBatchResult() + .withFailedPutCount(0) + .withRequestResponses(entries); + + expect(resultFuture.get()).andReturn(res.withFailedPutCount(i.equals(failedRecordBatch) ? 1 : 0)); + + expectBatchRequestWith(batches.get(i)) + .andReturn(resultFuture); + + replay(resultFuture); + } + + replay(buffer, firehoseClientMock); + + List failures = emitter.emit(buffer); + + verify(buffer, firehoseClientMock); + + Assert.assertEquals(1, failures.size()); + Assert.assertEquals(failedRecordNum.toString(), new String(failures.get(0).getData().array())); + } + + @Test + public void testAsyncFailureCase() throws IOException, ExecutionException, InterruptedException { + + List records = generateRecords(1300); + List> batches = new RecordBatcher().makeBatches(records); + + // Three batches, the second one will fail + + expectBatchRequestWith(batches.get(0)) + .andReturn(mockFutureOf(new PutRecordBatchResult().withFailedPutCount(0))); + + Future errorFuture = createMock(Future.class); + expect(errorFuture.get()).andThrow(new InterruptedException()); + + expectBatchRequestWith(batches.get(1)) + .andReturn(errorFuture); + + expectBatchRequestWith(batches.get(2)) + .andReturn(mockFutureOf(new PutRecordBatchResult().withFailedPutCount(0))); + + expect(buffer.getRecords()) + .andReturn(records); + + replay(buffer, errorFuture, firehoseClientMock); + + List failures = emitter.emit(buffer); + + Assert.assertEquals(500, failures.size()); + } +} diff --git a/src/test/java/com/amazonaws/services/kinesis/connectors/firehose/RecordBatcherTest.java b/src/test/java/com/amazonaws/services/kinesis/connectors/firehose/RecordBatcherTest.java new file mode 100644 index 0000000..a56cb98 --- /dev/null +++ b/src/test/java/com/amazonaws/services/kinesis/connectors/firehose/RecordBatcherTest.java @@ -0,0 +1,80 @@ +package com.amazonaws.services.kinesis.connectors.firehose; + +import com.amazonaws.services.kinesisfirehose.model.Record; +import org.junit.Assert; +import org.junit.Test; + +import java.nio.ByteBuffer; +import java.util.*; + + +public class RecordBatcherTest { + + private RecordBatcher batcher = new RecordBatcher(); + + private List generateRecords(Integer n) { + List records = new ArrayList<>(); + + for (Integer i = 0; i < n; i++) { + records.add(new Record().withData(ByteBuffer.wrap(i.toString().getBytes()))); + } + + return Collections.unmodifiableList(records); + } + + @Test + public void testShouldReturnEmptyListForNoRecords() { + + List> batches = batcher.makeBatches(new ArrayList()); + Assert.assertEquals(0, batches.size()); + + } + + @Test + public void testShouldSplitRecordsByCountForMultipleBatches() { + + List inRecords = generateRecords(1250); + + List> batches = batcher.makeBatches(inRecords); + + Assert.assertEquals(3, batches.size()); + Assert.assertEquals(500, batches.get(0).size()); + Assert.assertEquals(500, batches.get(1).size()); + Assert.assertEquals(250, batches.get(2).size()); + } + + @Test + public void testShouldNotSplitRecordsByCountForOneBatch() { + + List inRecords = generateRecords(123); + + List> batches = batcher.makeBatches(inRecords); + + Assert.assertEquals(1, batches.size()); + Assert.assertEquals(123, batches.get(0).size()); + } + + @Test + public void testShouldSplitRecordsByTotalSize() { + + byte[] record1mb = new byte[1048576]; + new Random().nextBytes(record1mb); + + byte[] record2mb = new byte[2097152]; + new Random().nextBytes(record2mb); + + List inRecords = new ArrayList<>(); + inRecords.add(new Record().withData(ByteBuffer.wrap(record1mb))); + inRecords.add(new Record().withData(ByteBuffer.wrap(record1mb))); + inRecords.add(new Record().withData(ByteBuffer.wrap(record1mb))); + inRecords.add(new Record().withData(ByteBuffer.wrap(record2mb))); + inRecords.add(new Record().withData(ByteBuffer.wrap(record1mb))); + + List> batches = batcher.makeBatches(inRecords); + + Assert.assertEquals(2, batches.size()); + Assert.assertEquals(3, batches.get(0).size()); + Assert.assertEquals(2, batches.get(1).size()); + } + +} From 152e99a9154dbaca84e1d2c0afd9c513b60c28f8 Mon Sep 17 00:00:00 2001 From: Adam Gray Date: Mon, 13 Feb 2017 14:36:23 +0000 Subject: [PATCH 4/5] remove stream region parameter --- .../services/kinesis/connectors/firehose/FirehoseEmitter.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/main/java/com/amazonaws/services/kinesis/connectors/firehose/FirehoseEmitter.java b/src/main/java/com/amazonaws/services/kinesis/connectors/firehose/FirehoseEmitter.java index 3a75114..741b82e 100644 --- a/src/main/java/com/amazonaws/services/kinesis/connectors/firehose/FirehoseEmitter.java +++ b/src/main/java/com/amazonaws/services/kinesis/connectors/firehose/FirehoseEmitter.java @@ -28,14 +28,12 @@ public class FirehoseEmitter implements IEmitter { private final AmazonKinesisFirehoseAsync firehoseClient; private final String streamName; - private final String streamRegion; public FirehoseEmitter(KinesisConnectorConfiguration configuration) { firehoseClient = AmazonKinesisFirehoseAsyncClientBuilder .defaultClient(); streamName = configuration.FIREHOSE_STREAM_NAME; - streamRegion = configuration.FIREHOSE_STREAM_REGION; } /** From 5751492d97cd5a0429fc49aa520534033c3c08a3 Mon Sep 17 00:00:00 2001 From: Adam Gray Date: Mon, 13 Feb 2017 14:36:38 +0000 Subject: [PATCH 5/5] handle null records case --- .../kinesis/connectors/firehose/RecordBatcher.java | 4 ++++ .../connectors/firehose/FirehoseEmitterTest.java | 14 ++++++++++++++ 2 files changed, 18 insertions(+) diff --git a/src/main/java/com/amazonaws/services/kinesis/connectors/firehose/RecordBatcher.java b/src/main/java/com/amazonaws/services/kinesis/connectors/firehose/RecordBatcher.java index 47a736b..6556141 100644 --- a/src/main/java/com/amazonaws/services/kinesis/connectors/firehose/RecordBatcher.java +++ b/src/main/java/com/amazonaws/services/kinesis/connectors/firehose/RecordBatcher.java @@ -31,6 +31,10 @@ List> makeBatches(List records) { for (Record record : records) { + if (record == null) { + continue; + } + Integer recSize = record.getData().remaining(); if (numRecords + 1 > 500 || byteSize + recSize > 4194304) { diff --git a/src/test/java/com/amazonaws/services/kinesis/connectors/firehose/FirehoseEmitterTest.java b/src/test/java/com/amazonaws/services/kinesis/connectors/firehose/FirehoseEmitterTest.java index 827edbb..4447ff6 100644 --- a/src/test/java/com/amazonaws/services/kinesis/connectors/firehose/FirehoseEmitterTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/connectors/firehose/FirehoseEmitterTest.java @@ -97,6 +97,20 @@ public void testNoRecordsCase() throws IOException { verify(firehoseClientMock, buffer); } + @Test + public void testNullRecordsCase() throws IOException { + ArrayList recs = new ArrayList(); + recs.add(null); + expect(buffer.getRecords()).andReturn(recs); + + replay(firehoseClientMock, buffer); + + List failures = emitter.emit(buffer); + Assert.assertTrue(failures.isEmpty()); + + verify(firehoseClientMock, buffer); + } + @Test public void testWorkingCaseNoFailures() throws IOException, InterruptedException, ExecutionException { List records = generateRecords(1750);