Skip to content

Commit

Permalink
Fixed #16 Added option to configure mapping of fields in SinkRecord t…
Browse files Browse the repository at this point in the history
…o CQL columns
  • Loading branch information
aaruna committed May 4, 2016
1 parent 294ec20 commit c580c87
Show file tree
Hide file tree
Showing 11 changed files with 223 additions and 40 deletions.
21 changes: 20 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,8 @@ All the others (BLOB, INET, UUID, TIMEUUID, LIST, SET, MAP, CUSTOM, UDT, TUPLE,

## CassandraSink
It stores Kafka SinkRecord in Cassandra tables. Currently, we only support STRUCT type in the SinkRecord.
The STRUCT can have multiple fields with primitive fieldtypes. We assume one-to-one mapping between the column names in the Cassandra sink table and the field names.
The STRUCT can have multiple fields with primitive fieldtypes.
By default, we assume one-to-one mapping between the column names in the Cassandra sink table and the field names.

Say, the SinkRecords has the following STRUCT value
```
Expand All @@ -97,6 +98,23 @@ Say, the SinkRecords has the following STRUCT value

Then the Cassandra table should have the columns - id, username, text

We also support specifying the field name mapping to column names, using the property `cassandra.sink.field.mapping`
Say, the SinkRecords has the following STRUCT value
```
{
'id': 1,
'user': {
'id': 123,
'name': 'Foo',
'email': 'foo@bar.com'
},
'text': 'This is my first tweet'
}
```
and the `cassandra.sink.field.mapping` has the value `{'id': 'id', 'user': {'id': 'uid', 'name': 'username'}, 'text': 'tweet_text'}`
Then the Cassandra table should have the columns - id, uid, username, tweet_text.
Note that since we did not specify any mapping for 'user.email', it is ignored and not inserted in the Cassandra Sink table.

Note: The library does not create the Cassandra tables - users are expected to create those before starting the sink

## Configuration
Expand Down Expand Up @@ -132,6 +150,7 @@ Refer `examples/config` for sample configuration files
|-------- |----------------------------|-----------------------|
| cassandra.sink.route.\<topic_name\> | The table to write the SinkRecords to, \<keyspace\>.\<tableName\> | |
| cassandra.sink.consistency | The consistency level for writes to Cassandra. | LOCAL_QUORUM |
| cassandra.sink.field.mapping | The JSON String mapping field names to column names. | |


## Building from Source
Expand Down
18 changes: 10 additions & 8 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,16 @@ libraryDependencies ++= Seq(
"org.joda" % "joda-convert" % "1.8.1",
"org.scalatest" %% "scalatest" % "2.2.6" % "test,it",
"org.mockito" % "mockito-core" % "2.0.34-beta" % "test,it",
"ch.qos.logback" % "logback-classic" % "1.1.3" % "test,it",
CrossVersion.partialVersion(scalaVersion.value) match {
case Some((2, minor)) if minor < 11 =>
"org.slf4j" % "slf4j-api" % "1.7.13"
case _ =>
"com.typesafe.scala-logging" %% "scala-logging" % "3.1.0"
}
)
"ch.qos.logback" % "logback-classic" % "1.1.3" % "test,it"
) ++ (CrossVersion.partialVersion(scalaVersion.value) match {
case Some((2, minor)) if minor < 11 => Seq(
"org.slf4j" % "slf4j-api" % "1.7.13"
)
case _ => Seq(
"com.typesafe.scala-logging" %% "scala-logging" % "3.1.0",
"org.scala-lang.modules" %% "scala-parser-combinators" % "1.0.4"
)
})

publishMavenStyle := true

Expand Down
7 changes: 7 additions & 0 deletions src/it/resources/setup.cql
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,13 @@ CREATE TABLE IF NOT EXISTS test.kv (
value int,
PRIMARY KEY (key));

CREATE TABLE IF NOT EXISTS test.fieldmap (
new_key text,
new_value int,
new_nested text,
new_dnested text,
PRIMARY KEY (new_key));

CREATE TABLE test.playlists (
id bigint,
song_order int,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,17 @@
package com.tuplejump.kafka.connect.cassandra

import scala.collection.JavaConverters._
import scala.util.parsing.json.JSONObject
import org.apache.kafka.connect.data.{Schema, SchemaBuilder, Struct}
import org.apache.kafka.connect.sink.{SinkRecord, SinkTaskContext}

class CassandraSinkTaskSpec extends AbstractFlatSpec {

val topicName = "test_kv_topic"
val tableName = "test.kv"
val config = sinkProperties(Map(topicName -> tableName))

it should "start sink task" in {
val topicName = "test_kv_topic"
val tableName = "test.kv"
val config = sinkProperties(Map(topicName -> tableName))

val sinkTask = new CassandraSinkTask()
val mockContext = mock[SinkTaskContext]

Expand All @@ -38,6 +39,10 @@ class CassandraSinkTaskSpec extends AbstractFlatSpec {
}

it should "save records in cassandra" in {
val topicName = "test_kv_topic"
val tableName = "test.kv"
val config = sinkProperties(Map(topicName -> tableName))

val sinkTask = new CassandraSinkTask()
val mockContext = mock[SinkTaskContext]

Expand All @@ -64,5 +69,76 @@ class CassandraSinkTaskSpec extends AbstractFlatSpec {
rowCount should be(2)
cc.shutdown()
}


it should "save records in cassandra with custom field mapping" in {
val topicName = "test_fieldmap_topic"
val tableName = "test.fieldmap"
val config = sinkProperties(Map(topicName -> tableName))

val sinkTask = new CassandraSinkTask()
val mockContext = mock[SinkTaskContext]

val fieldMapping: JSONObject = JSONObject(Map(
"key" -> "new_key",
"value" -> "new_value",
"nvalue" -> JSONObject(Map(
"blah1" -> "new_nested",
"blah2" -> JSONObject(Map(
"blah2" -> "new_dnested"
))
))
))

sinkTask.initialize(mockContext)
sinkTask.start((config + ("cassandra.sink.field.mapping" -> fieldMapping.toString())).asJava)

val doubleNestedSchema = SchemaBuilder.struct.name("dnestedSchema").version(1)
.field("blah1", Schema.STRING_SCHEMA)
.field("blah2", Schema.STRING_SCHEMA).build
val nestedSchema = SchemaBuilder.struct.name("nestedSchema").version(1)
.field("blah1", Schema.STRING_SCHEMA)
.field("blah2", doubleNestedSchema).build
val valueSchema = SchemaBuilder.struct.name("record").version(1)
.field("key", Schema.STRING_SCHEMA)
.field("value", Schema.INT32_SCHEMA)
.field("nvalue", nestedSchema).build

val dnestedValue1 = new Struct(doubleNestedSchema)
.put("blah1", "dnes_blah1_1")
.put("blah2", "dnes_blah2_1")
val nestedValue1 = new Struct(nestedSchema)
.put("blah1", "nes_blah1_1")
.put("blah2", dnestedValue1)
val value1 = new Struct(valueSchema)
.put("key", "pqr")
.put("value", 15)
.put("nvalue", nestedValue1)

val dnestedValue2 = new Struct(doubleNestedSchema)
.put("blah1", "dnes_blah1_2")
.put("blah2", "dnes_blah2_2")
val nestedValue2 = new Struct(nestedSchema)
.put("blah1", "nes_blah1_2")
.put("blah2", dnestedValue2)
val value2 = new Struct(valueSchema)
.put("key", "abc")
.put("value", 17)
.put("nvalue", nestedValue2)

val record1 = new SinkRecord(topicName, 1, SchemaBuilder.struct.build, "key", valueSchema, value1, 0)
val record2 = new SinkRecord(topicName, 1, SchemaBuilder.struct.build, "key", valueSchema, value2, 0)

sinkTask.put(List(record1, record2).asJavaCollection)

sinkTask.stop()

val cc = CassandraCluster.local
val session = cc.session
val result = session.execute(s"select count(1) from $tableName").one()
val rowCount = result.getLong(0)
rowCount should be(2)
cc.shutdown()
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class CassandraSinkTask extends SinkTask with CassandraTask {
private def write(sc: SinkConfig, byTopic: Iterable[SinkRecord]): Unit = {
// TODO needs ticket: if (byTopic.size > 1) boundWrite(sc, byTopic) else
for (record <- byTopic) {
val query = record.as(sc.schema.namespace)
val query = record.as(sc.schema.namespace, sc.options.fieldMapping)
Try(session.executeAsync(query.cql)) recover { case NonFatal(e) =>
throw new ConnectException(
s"Error executing ${byTopic.size} records for schema '${sc.schema}'.", e)
Expand All @@ -64,7 +64,7 @@ class CassandraSinkTask extends SinkTask with CassandraTask {
private def boundWrite(sc: SinkConfig, byTopic: Iterable[SinkRecord]): Unit = {
val statement = prepare(session, sc)
val futures = for (record <- byTopic) yield {
val query = record.as(sc.schema.namespace)
val query = record.as(sc.schema.namespace, sc.options.fieldMapping)
try {
val bs = statement.bind(query.cql)
session.executeAsync(bs)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,9 +107,12 @@ private[cassandra] object Syntax {
namespace.length >= 3 || namespace.contains(".")
}

def apply(namespace: String, columnNames: List[ColumnName], columnValues: String): SinkQuery = {
val columns = columnNames.mkString(",")
SinkQuery(s"INSERT INTO $namespace($columns) VALUES($columnValues)")
def apply(namespace: String, columnNamesVsValues: Map[ColumnName, String]): SinkQuery = {
val query = columnNamesVsValues.view.map(e => Vector(e._1, e._2)).transpose match {
case columnNames :: columnValues :: Nil =>
s"INSERT INTO ${namespace}(${columnNames.mkString(",")}) VALUES(${columnValues.mkString(",")})"
}
SinkQuery(query)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@ package com.tuplejump.kafka.connect.cassandra

import scala.collection.immutable
import scala.util.control.NonFatal
import scala.util.parsing.json.JSON
import org.apache.kafka.common.config.ConfigException
import org.apache.kafka.connect.connector.Task
import org.apache.kafka.connect.errors.DataException
import com.datastax.driver.core.{TableMetadata, ConsistencyLevel}
import InternalConfig._

Expand Down Expand Up @@ -123,6 +125,9 @@ object TaskConfig {
final val SinkConsistency: Key = "cassandra.sink.consistency"
final val DefaultSinkConsistency = ConsistencyLevel.LOCAL_QUORUM

final val FieldMapping: Key = "cassandra.sink.field.mapping"
final val DefaultFieldMapping = Map.empty[String, String]

/* **** Task config **** */
final val TaskParallelismStrategy: Key = "cassandra.task.parallelism"

Expand Down Expand Up @@ -156,6 +161,10 @@ private[cassandra] object InternalConfig {
def toInt(a: String): Int = a.toInt
def toLong(a: String): Long = a.toLong
def toConsistency(a: String): ConsistencyLevel = ConsistencyLevel.valueOf(a)
def toMap(a: String): Map[String, Any] = JSON.parseFull(a) collect {
case data: Map[_, _] => data.asInstanceOf[Map[String, Any]]
} getOrElse(throw new DataException(s"Field mapping type for '$a' is not supported."))


/** A Cassandra `keyspace.table` to Kafka topic mapping.
*
Expand Down Expand Up @@ -319,15 +328,21 @@ private[cassandra] object InternalConfig {
sealed trait ClusterQueryOptions

/** Settings related for individual queries, can be set per keyspace.table. */
final case class WriteOptions(consistency: ConsistencyLevel) extends ClusterQueryOptions
final case class WriteOptions(consistency: ConsistencyLevel,
fieldMapping: Map[String, Any]) extends ClusterQueryOptions

object WriteOptions {

def apply(config: Map[String,String]): WriteOptions =
WriteOptions(config.valueOr[ConsistencyLevel](
SinkConsistency, toConsistency, DefaultSourceConsistency))
def apply(config: Map[String, String]): WriteOptions = {
WriteOptions(
consistency = config.valueOr[ConsistencyLevel](
SinkConsistency, toConsistency, DefaultSourceConsistency),
fieldMapping = config.valueOr[Map[String, Any]](
FieldMapping, toMap, DefaultFieldMapping
)
)
}
}

/** Settings related for individual queries, can be set per keyspace.table. */
final case class ReadOptions(splitSize: Int,
fetchSize: Int,
Expand Down
86 changes: 73 additions & 13 deletions src/main/scala/com/tuplejump/kafka/connect/cassandra/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package com.tuplejump.kafka.connect

import org.apache.kafka.connect.data.Field

/** Common package operations. */
package object cassandra {
import java.util.{List => JList, Map => JMap, Date => JDate}
Expand Down Expand Up @@ -69,29 +71,87 @@ package object cassandra {

implicit class SinkRecordOps(record: SinkRecord) {

def as(namespace: String): SinkQuery = {
def as(namespace: String, fieldMapping: Map[String, Any]): SinkQuery = {
val colNamesVsValues: Map[String, String] = {
if (fieldMapping.isEmpty) {
toCqlData
} else {
toCqlData(fieldMapping)
}
}
SinkQuery(namespace, colNamesVsValues)
}

def toCqlData(): (Map[String, String]) = {
val schema = record.valueSchema
val columnNames = schema.asColumnNames
val columnValues = schema.`type`() match {
schema.`type`() match {
case STRUCT =>
val struct: Struct = record.value.asInstanceOf[Struct]
columnNames.map(convert(schema, struct, _)).mkString(",")
case other => throw new DataException(
s"Unable to create insert statement with unsupported value schema type $other.")
schema.fields.asScala.map { field =>
field.name -> convert(schema, record.value.asInstanceOf[Struct], field)
}.toMap
case other =>
throw new DataException(
s"Unable to create insert statement with unsupported value schema type $other.")
}
}

def toCqlData(fieldMapping: Map[String, Any]): Map[String, String] = {
record.valueSchema.`type`() match {
case STRUCT =>
toColNamesVsValues(record.value.asInstanceOf[Struct], fieldMapping)
case other =>
throw new DataException(
s"Unable to create insert statement with unsupported value schema type $other.")
}
}

// scalastyle:off
private def toColNamesVsValues(struct: Struct, fieldMapping: Map[String, Any],
colNameVsValues: Map[String, String] = Map.empty): Map[String, String] = {
lazy val exception = new DataException(s"Mismatch between fieldMapping and Schema")
var result: Map[String, String] = colNameVsValues
struct.schema.fields.asScala.foreach { field =>
val fieldMappingValue = fieldMapping.get(field.name)
field.schema.`type`() match {
case STRUCT =>
fieldMappingValue match {
case Some(value) =>
value match {
case newMap: Map[_, _] =>
result = toColNamesVsValues(struct.get(field).asInstanceOf[Struct],
newMap.asInstanceOf[Map[String, Any]], result)
case _ =>
throw exception
}
case None =>
}
case _ =>
fieldMappingValue match {
case Some(value) =>
value match {
case strValue: String =>
result += (strValue -> convert(field.schema, struct, field))
case _ =>
throw exception
}
case None =>
}
}
}
SinkQuery(namespace, columnNames, columnValues)
result
}
// scalastyle:on

/* TODO support all types. */
def convert(schema: Schema, result: Struct, col: String): AnyRef =
schema.field(col).schema match {
def convert(schema: Schema, result: Struct, field: Field): String =
field.schema match {
case x if x.`type`() == Schema.STRING_SCHEMA.`type`() =>
s"'${result.get(col).toString}'"
s"'${result.get(field).toString}'"
case x if x.name() == Timestamp.LOGICAL_NAME =>
val time = Timestamp.fromLogical(x, result.get(col).asInstanceOf[JDate])
val time = Timestamp.fromLogical(x, result.get(field).asInstanceOf[JDate])
s"$time"
case y =>
result.get(col)
String.valueOf(result.get(field))
}

def asColumnNames: List[ColumnName] =
Expand Down
Loading

0 comments on commit c580c87

Please sign in to comment.