Skip to content
This repository has been archived by the owner on Aug 19, 2024. It is now read-only.

Commit

Permalink
add sink
Browse files Browse the repository at this point in the history
  • Loading branch information
shikhar committed Sep 1, 2016
1 parent f2eee80 commit e168f83
Show file tree
Hide file tree
Showing 13 changed files with 769 additions and 13 deletions.
145 changes: 136 additions & 9 deletions README.rst
Original file line number Diff line number Diff line change
@@ -1,17 +1,150 @@
**kafka-connect-dynamodb** is a `Kafka Connector <http://kafka.apache.org/documentation.html#connect>`_ for loading data from and to Amazon DynamoDB.
**kafka-connect-dynamodb** is a `Kafka Connector <http://kafka.apache.org/documentation.html#connect>`_ for loading data to and from Amazon DynamoDB.

It is implemented using the AWS Java SDK for DynamoDB.
For authentication, the `DefaultAWSCredentialsProviderChain <http://docs.aws.amazon.com/java-sdk/latest/developer-guide/credentials.html#id6>`_ is used.

Sink Connector
==============

Example configuration
---------------------

Ingest the ``orders`` topic to a DynamoDB table of the same name in the specified region::

name=dynamodb-sink-test
topics=orders
connector.class=dynamok.sink.DynamoDbSinkConnector
region=us-west-2
ignore.record.key=true

Record conversion
-----------------

Refer to `DynamoDB Data Types <http://docs.aws.amazon.com/amazondynamodb/latest/developerguide/HowItWorks.NamingRulesDataTypes.html#HowItWorks.DataTypes>`_.

At the top-level, we need to either be converting to the DynamoDB ``Map`` data type,
or the ``top.key.attribute`` or ``top.value.attribute`` configuration options for the Kafka record key or value as applicable should be configured,
so we can ensure being able to hoist the converted value as a DynamoDB record.

Schema present
^^^^^^^^^^^^^^

========================================================= ===========
Connect Schema.Type DynamoDB
--------------------------------------------------------- -----------
null Null
INT8, INT16, INT32, INT64, FLOAT32, FLOAT64, Decimal Number
BOOL Boolean
BYTES Binary
STRING String
ARRAY List
MAP [1], STRUCT Map
========================================================= ===========

[1] Map keys must be primitive types, and cannot be optional.

Schemaless
^^^^^^^^^^

======================================================================================= ============
Java DynamoDB
--------------------------------------------------------------------------------------- ------------
null Null
Number (i.e. Byte, Short, Integer, Long, Float, Double, BigInteger, BigDecimal) Number
Boolean Boolean
byte[], ByteBuffer Binary
String String
List List
Empty set [1] Null
Set<String> String Set
Set<Number> Number Set
Set<byte[]>, Set<ByteBuffer> Binary Set
Map [2] Map
======================================================================================= ============


Any other data type will result in the connector to fail.

[1] It is not possible to determine the element type of an empty set.

[2] Map keys must be primitive types, and cannot be optional.

Configuration options
---------------------

``region``
AWS region for the source DynamoDB.

* Type: string
* Default: ""
* Importance: high

``kafka.attributes``
Trio of ``topic,partition,offset`` attribute names to include in records, set to empty to omit these attributes.

* Type: list
* Default: [kafka_topic, kafka_partition, kafka_offset]
* Importance: high

``table.format``
Format string for destination DynamoDB table name, use ``${topic}`` as placeholder for source topic.

* Type: string
* Default: "${topic}"
* Importance: high

``ignore.record.key``
Whether to ignore Kafka record keys in preparing the DynamoDB record.

* Type: boolean
* Default: false
* Importance: medium

``ignore.record.value``
Whether to ignore Kafka record value in preparing the DynamoDB record.

* Type: boolean
* Default: false
* Importance: medium

``max.retries``
The maximum number of times to retry on errors before failing the task.

* Type: int
* Default: 10
* Importance: medium

``retry.backoff.ms``
The time in milliseconds to wait following an error before a retry attempt is made.

* Type: int
* Default: 3000
* Importance: medium

``top.key.attribute``
DynamoDB attribute name to use for the record key. Leave empty if no top-level envelope attribute is desired, such as w.

* Type: string
* Default: ""
* Importance: medium

``top.value.attribute``
DynamoDB attribute name to use for the record value. Leave empty if no top-level envelope attribute is desired.

* Type: string
* Default: ""
* Importance: medium


Source Connector
================

Example configuration
---------------------

This will attempt to ingest all DynamoDB tables in the specified region, to Kafka topics with the same name as the source table::
Ingest all DynamoDB tables in the specified region, to Kafka topics with the same name as the source table::

name=mytest
name=dynamodb-source-test
connector.class=dynamok.source.DynamoDbSourceConnector
region=us-west-2

Expand Down Expand Up @@ -55,9 +188,3 @@ Configuration options

* Type: string
* Importance: medium


Sink Connector
==============

TODO
5 changes: 5 additions & 0 deletions sink-quickstart.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
name=dynamodb-sink-test
topics=orders
connector.class=dynamok.sink.DynamoDbSinkConnector
region=us-west-2
ignore.record.key=true
3 changes: 3 additions & 0 deletions source-quickstart.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
name=dynamodb-source-test
connector.class=dynamok.source.DynamoDbSourceConnector
region=us-west-2
12 changes: 12 additions & 0 deletions src/main/java/dynamok/Version.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package dynamok;

public class Version {

// TODO pull this in from a packaged resource controlled by the build
private static final String VERSION = "0.1.0-SNAPSHOT";

public static String get() {
return VERSION;
}

}
162 changes: 162 additions & 0 deletions src/main/java/dynamok/sink/AttributeValueConverter.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
package dynamok.sink;

import com.amazonaws.services.dynamodbv2.model.AttributeValue;
import org.apache.kafka.connect.data.Decimal;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.ConnectException;

import java.nio.ByteBuffer;
import java.util.Base64;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

public class AttributeValueConverter {

public static final AttributeValue NULL_VALUE = new AttributeValue().withNULL(true);

public static AttributeValue toAttributeValue(Schema schema, Object value) {
if (value == null) {
if (!schema.isOptional()) {
throw new ConnectException("null value for non-optional schema");
}
return NULL_VALUE;
}
if (schema.name() != null && schema.name().equals(Decimal.LOGICAL_NAME)) {
return new AttributeValue().withN(value.toString());
}
switch (schema.type()) {
case INT8:
case INT16:
case INT32:
case INT64:
case FLOAT32:
case FLOAT64:
return new AttributeValue().withN(value.toString());
case BOOLEAN:
return new AttributeValue().withBOOL((boolean) value);
case STRING:
return new AttributeValue().withS((String) value);
case BYTES:
return new AttributeValue().withB(toByteBuffer(value));
case ARRAY: {
return new AttributeValue().withL(
((List<?>) value).stream()
.map(item -> toAttributeValue(schema.valueSchema(), item))
.collect(Collectors.toList())
);
}
case MAP: {
if (schema.keySchema().isOptional()) {
throw new ConnectException("MAP key schema must not be optional");
}
if (!schema.keySchema().type().isPrimitive()) {
throw new ConnectException("MAP key schema must be of primitive type");
}
final Map<?, ?> sourceMap = (Map) value;
final Map<String, AttributeValue> attributesMap = new HashMap<>(sourceMap.size());
for (Map.Entry<?, ?> e : sourceMap.entrySet()) {
attributesMap.put(
primitiveAsString(e.getKey()),
toAttributeValue(schema.valueSchema(), e.getValue())
);
}
return new AttributeValue().withM(attributesMap);
}
case STRUCT: {
final Struct struct = (Struct) value;
final List<Field> fields = schema.fields();
final Map<String, AttributeValue> attributesMap = new HashMap<>(fields.size());
for (Field field : fields) {
attributesMap.put(field.name(), toAttributeValue(field.schema(), struct.get(field)));
}
return new AttributeValue().withM(attributesMap);
}
default:
throw new ConnectException("Unknown Schema.Type: " + schema.type());
}
}

public static AttributeValue toAttributeValueSchemaless(Object value) {
if (value == null) {
return NULL_VALUE;
}
if (value instanceof Number) {
return new AttributeValue().withN(value.toString());
}
if (value instanceof Boolean) {
return new AttributeValue().withBOOL((Boolean) value);
}
if (value instanceof String) {
return new AttributeValue().withS((String) value);
}
if (value instanceof byte[] || value instanceof ByteBuffer) {
return new AttributeValue().withB(toByteBuffer(value));
}
if (value instanceof List) {
// We could have treated it as NS/BS/SS if the list is homogeneous and a compatible type, but can't know for ane empty list
return new AttributeValue().withL(
((List<?>) value).stream()
.map(AttributeValueConverter::toAttributeValueSchemaless)
.collect(Collectors.toList())
);
}
if (value instanceof Set) {
final Set<?> set = (Set) value;
if (set.isEmpty()) {
return NULL_VALUE;
}
final Object firstItem = ((Iterator) set.iterator()).next();
if (firstItem instanceof String) {
return new AttributeValue().withSS((Set<String>) set);
}
if (firstItem instanceof Number) {
return new AttributeValue().withNS(set.stream().map(Object::toString).collect(Collectors.toSet()));
}
if (firstItem instanceof byte[] || firstItem instanceof ByteBuffer) {
return new AttributeValue().withBS(set.stream().map(AttributeValueConverter::toByteBuffer).collect(Collectors.toSet()));
}
throw new ConnectException("Unsupported Set element type: " + firstItem.getClass());
}
if (value instanceof Map) {
final Map<?, ?> sourceMap = (Map) value;
final Map<String, AttributeValue> attributesMap = new HashMap<>(sourceMap.size());
for (Map.Entry<?, ?> e : sourceMap.entrySet()) {
attributesMap.put(
primitiveAsString(e.getKey()),
toAttributeValueSchemaless(e.getValue())
);
}
return new AttributeValue().withM(attributesMap);
}
throw new ConnectException("Unsupported value type: " + value.getClass());
}

private static String primitiveAsString(Object value) {
if (value instanceof Number || value instanceof Boolean || value instanceof String) {
return value.toString();
}
if (value instanceof byte[]) {
return Base64.getEncoder().encodeToString((byte[]) value);
} else if (value instanceof ByteBuffer) {
return Base64.getEncoder().encode((ByteBuffer) value).asCharBuffer().toString();
}
throw new ConnectException("Not a primitive: " + value.getClass());
}

private static ByteBuffer toByteBuffer(Object bytesValue) {
if (bytesValue instanceof byte[]) {
return ByteBuffer.wrap((byte[]) bytesValue);
} else if (bytesValue instanceof ByteBuffer) {
return ((ByteBuffer) bytesValue);
} else {
throw new ConnectException("Invalid bytes value of type: " + bytesValue.getClass());
}
}

}
Loading

0 comments on commit e168f83

Please sign in to comment.