Skip to content
This repository has been archived by the owner on Mar 30, 2021. It is now read-only.

Add firehose connector #79

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
<artifactId>amazon-kinesis-connectors</artifactId>
<packaging>jar</packaging>
<name>Amazon Kinesis Connector Library</name>
<version>1.3.0</version>
<version>1.3.1-SNAPSHOT</version>
<description>The Amazon Kinesis Connector Library helps Java developers integrate Amazon Kinesis with other AWS and non-AWS services.</description>
<url>https://aws.amazon.com/kinesis</url>

Expand All @@ -23,8 +23,8 @@
</licenses>

<properties>
<amazon-kinesis-client.version>1.7.2</amazon-kinesis-client.version>
<aws-java-sdk.version>1.11.14</aws-java-sdk.version>
<amazon-kinesis-client.version>1.7.3</amazon-kinesis-client.version>
<aws-java-sdk.version>1.11.86</aws-java-sdk.version>
<elasticsearch.version>1.2.1</elasticsearch.version>
<fasterxml-jackson.version>2.6.6</fasterxml-jackson.version>
</properties>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,8 @@ public class KinesisConnectorConfiguration {
"elasticsearchCloudFormationSSHLocation";
public static final String PROP_ELASTICSEARCH_CLOUDFORMATION_CLUSTER_SIZE =
"elasticsearchCloudFormationClusterSize";
public static final String PROP_FIREHOSE_STREAM_NAME = "streamName";
public static final String PROP_FIREHOSE_STREAM_REGION = "streamRegion";

// Default Connector App Constants
public static final String DEFAULT_APP_NAME = "KinesisConnector";
Expand Down Expand Up @@ -157,6 +159,10 @@ public class KinesisConnectorConfiguration {
public static final long DEFAULT_CLOUDWATCH_BUFFER_TIME = 10 * 1000L;
public static final int DEFAULT_CLOUDWATCH_MAX_QUEUE_SIZE = 10000;

// Default Amazon Kinesis Firehose Constants
public static final String DEFAULT_FIREHOSE_STREAM_NAME = "OutputStream";
public static final String DEFAULT_FIREHOSE_STREAM_REGION = "eu-west-1";

// Default Amazon Elasticsearch Constraints
public static final String DEFAULT_ELASTICSEARCH_CLUSTER_NAME = "elasticsearch";
public static final String DEFAULT_ELASTICSEARCH_ENDPOINT = "localhost";
Expand Down Expand Up @@ -236,6 +242,8 @@ public class KinesisConnectorConfiguration {
public final String ELASTICSEARCH_CLOUDFORMATION_CLUSTER_INSTANCE_TYPE;
public final String ELASTICSEARCH_CLOUDFORMATION_SSH_LOCATION;
public final String ELASTICSEARCH_CLOUDFORMATION_CLUSTER_SIZE;
public final String FIREHOSE_STREAM_NAME;
public final String FIREHOSE_STREAM_REGION;

/**
* Configure the connector application with any set of properties that are unique to the application. Any
Expand Down Expand Up @@ -349,6 +357,12 @@ public KinesisConnectorConfiguration(Properties properties, AWSCredentialsProvid
ELASTICSEARCH_CLOUDFORMATION_CLUSTER_SIZE =
properties.getProperty(PROP_ELASTICSEARCH_CLOUDFORMATION_CLUSTER_SIZE,
DEFAULT_ELASTICSEARCH_CLOUDFORMATION_CLUSTER_SIZE);
FIREHOSE_STREAM_NAME =
properties.getProperty(PROP_FIREHOSE_STREAM_NAME,
DEFAULT_FIREHOSE_STREAM_NAME);
FIREHOSE_STREAM_REGION =
properties.getProperty(PROP_FIREHOSE_STREAM_REGION,
DEFAULT_FIREHOSE_STREAM_REGION);

// Amazon Kinesis Client Library configuration
WORKER_ID = properties.getProperty(PROP_WORKER_ID, DEFAULT_WORKER_ID);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
package com.amazonaws.services.kinesis.connectors.firehose;

import com.amazonaws.services.kinesis.connectors.KinesisConnectorConfiguration;
import com.amazonaws.services.kinesis.connectors.UnmodifiableBuffer;
import com.amazonaws.services.kinesis.connectors.interfaces.IEmitter;
import com.amazonaws.services.kinesisfirehose.AmazonKinesisFirehoseAsync;
import com.amazonaws.services.kinesisfirehose.AmazonKinesisFirehoseAsyncClient;
import com.amazonaws.services.kinesisfirehose.AmazonKinesisFirehoseAsyncClientBuilder;
import com.amazonaws.services.kinesisfirehose.model.PutRecordBatchRequest;
import com.amazonaws.services.kinesisfirehose.model.PutRecordBatchResponseEntry;
import com.amazonaws.services.kinesisfirehose.model.PutRecordBatchResult;
import com.amazonaws.services.kinesisfirehose.model.Record;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

/**
* Emitter to connect a Kinesis Stream to a Kinesis Firehose stream
*/
public class FirehoseEmitter implements IEmitter<Record> {
private static final Log LOG = LogFactory.getLog(FirehoseEmitter.class);

private final AmazonKinesisFirehoseAsync firehoseClient;
private final String streamName;

public FirehoseEmitter(KinesisConnectorConfiguration configuration) {
firehoseClient = AmazonKinesisFirehoseAsyncClientBuilder
.defaultClient();

streamName = configuration.FIREHOSE_STREAM_NAME;
}

/**
* Emit a record buffer
* @param buffer
* The full buffer of records
* @return List<Record> A list of any records that were not accepted by Kinesis Firehose
* @throws IOException
*/
public List<Record> emit(UnmodifiableBuffer<Record> buffer) throws IOException {

List<Record> records = buffer.getRecords();

List<List<Record>> batches = new RecordBatcher().makeBatches(records);

ArrayList<Future<PutRecordBatchResult>> resultFutures = new ArrayList<>();

Integer i = 0;
for (List<Record> batch : batches) {
LOG.info(String.format("Writing %d records to firehose stream (batch %d of %d)", batch.size(), ++i, batches.size()));

resultFutures.add(firehoseClient.putRecordBatchAsync(new PutRecordBatchRequest()
.withRecords(batch)
.withDeliveryStreamName(streamName)));
}

List<Record> failures = new ArrayList<>();

for (i = 0; i < resultFutures.size(); i++) {
try {
PutRecordBatchResult res = resultFutures.get(i).get();

if (res.getFailedPutCount() > 0) {
List<PutRecordBatchResponseEntry> entries = res.getRequestResponses();

for (Integer j = 0; j < entries.size(); j++) {
PutRecordBatchResponseEntry entry = entries.get(j);
if (entry.getErrorCode() != null) {
failures.add(batches.get(i).get(j));
}
}
}
} catch (InterruptedException | ExecutionException e) {
LOG.warn(String.format("Caught %s while putting batch %d, %d records will be failed",
e.getClass().getName(),
i, batches.get(i).size()));
LOG.warn("Exception details: ", e);

// This whole batch of records to the failure list
failures.addAll(batches.get(i));
}
}

if (!failures.isEmpty()) {
LOG.warn(String.format("%d records failed", failures.size()));
}
return failures;
}

public void fail(List<Record> records) {
for (Record record : records) {
LOG.error("Record failed: " + record);
}
}

public void shutdown() {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package com.amazonaws.services.kinesis.connectors.firehose;

import com.amazonaws.services.kinesis.connectors.interfaces.ITransformer;
import com.amazonaws.services.kinesis.connectors.interfaces.ITransformerBase;
import com.amazonaws.services.kinesisfirehose.model.Record;

import java.io.IOException;

/**
* Transformation class to be used with the FirehoseEmitter
*
* @param <T>
*/
public abstract class FirehoseTransformer<T> implements ITransformerBase<T, Record> {

@Override
public abstract Record fromClass(T record) throws IOException;

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package com.amazonaws.services.kinesis.connectors.firehose;

import com.amazonaws.services.kinesis.connectors.elasticsearch.ElasticsearchEmitter;
import com.amazonaws.services.kinesisfirehose.model.Record;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

/**
* Utility class to split a buffer into record batches to batches of Kinesis Firehose records of
* <= 500 records and <= 4MB in size, as per the Firehose API limits.
*/

class RecordBatcher {

private static final Log LOG = LogFactory.getLog(ElasticsearchEmitter.class);

List<List<Record>> makeBatches(List<Record> records) {
// N.B. the case where a single record is over the byte-size limit is not handled
// here; it will be rejected by the Firehose API and classed as failed when it is
// emitted.

Integer numRecords = 0;
Integer byteSize = 0;
List<Record> curBatch = new ArrayList<>();

List<List<Record>> outRecords = new ArrayList<>();

for (Record record : records) {

if (record == null) {
continue;
}

Integer recSize = record.getData().remaining();

if (numRecords + 1 > 500 || byteSize + recSize > 4194304) {
// add current batch to the list and start a new one
outRecords.add(Collections.unmodifiableList(curBatch));

curBatch = new ArrayList<>();

numRecords = 0;
byteSize = 0;
}

curBatch.add(record);

numRecords++;
byteSize += recSize;
}

if (curBatch.size() > 0) {
outRecords.add(curBatch);
}

LOG.debug(String.format("Built %d batches", outRecords.size()));

return Collections.unmodifiableList(outRecords);
}

}
Loading