Skip to content

Commit

Permalink
Merge pull request #25 from Shiti/develop
Browse files Browse the repository at this point in the history
Example using source and sink
  • Loading branch information
Shiti committed May 3, 2016
2 parents d523cd6 + 3af19d6 commit b9c9fe8
Show file tree
Hide file tree
Showing 8 changed files with 223 additions and 38 deletions.
10 changes: 9 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,12 @@ project/plugins/project/
.scala_dependencies
.worksheet

.idea/
.idea/

examples/cassandra/
examples/setup.cql
examples/kafka_2.11-0.9.0.1/
examples/cassandraLog
examples/zkLog
examples/serverLog
examples/connectLog
93 changes: 56 additions & 37 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,27 +1,48 @@
# Kafka Connect Cassandra [![GitHub license](https://img.shields.io/badge/license-Apache%20V2-green.svg)](https://github.com/tuplejump/kafka-connect-cassandra/blob/master/LICENSE) [![Build Status](https://travis-ci.org/tuplejump/kafka-connect-cassandra.svg?branch=master)](https://travis-ci.org/tuplejump/kafka-connect-cassandra?branch=master)
Kafka Connect Cassandra Connector. This project includes source/sink connectors.
Kafka Connect Cassandra Connector. This project includes source & sink connectors.

## Release Status
Experimental phase.

## Table of Contents

* [CassandraSink](#cassandrasink)
* [Usage](#usage)
* [Running Examples](#running-example)
* [CassandraSource](#cassandrasource)
* [Supported CQL Types](#cql-types-supported)
* [CassandraSink](#cassandrasink)
* [Configuration](#configuration)
* [Sample Configuration](#sample-config)
* [Sample Source Config](#sample-source-config)
* [Sample Sink Config](#sample-sink-config)
* [Connect Properties](#connect-properties-for-both-source-and-sink)
* [Source Connect Properties](#source-connect-properties)
* [Sink Connect Properties](#sink-connect-properties)
* [Building From Source](#building-from-source)

## CassandraSink
It stores Kafka SinkRecord in Cassandra tables.
## Usage
The project can be used in two ways:

Note: The library does not create the Cassandra tables - users are expected to create those before starting the sink
1. as a library in another project,

"kafka-connect-cassandra" is published on maven central by Tuplejump. It can be defined as a dependency in the build file.
For example, with SBT,

```
libraryDependencies += com.tuplejump" %% "kafka-connect-cassandra" % "0.0.7"
```

2. using jar with Kafka Connect

Download the [jar](http://downloads.tuplejump.com/kafka-connect-cassandra-assembly-0.0.7.jar) and copy it to 'KAFKA_HOME/libs'

## Running Examples
The example uses CassandraSource and loads data from `demo.event_store`. This data is then saved in another table `demo.event_store_sink` using CassandraSink.
To run the example, execute `examples/start.sh` in the project directory.

This script starts Cassandra, Zookeper, Kafka Server and then initiates a Kafka Connect command using the Source and Sink Configuration specified in `examples/config`.
Once the script execution is completed, the data from `demo.event_store` will be available in `demo.event_store_sink`. You can insert a few rows in `demo.event_store` to see this.

To stop the processes started for running the example, execute `examples/stop.sh`

Note: On running the example repeatedly, an error is thrown that topic `demo` already exists but it won't block the execution

## CassandraSource
It polls Cassandra with specified query. Using this, data can be fetched from Cassandra in two modes:
Expand Down Expand Up @@ -59,36 +80,28 @@ Here, `previousTime()` and `currentTime()` are replaced prior to fetching the da
| TIMESTAMP | TIMESTAMP |
| VARINT | INT64 |

All the others (BLOB,INET,UUID,TIMEUUID,LIST,SET,MAP,CUSTOM,UDT,TUPLE,SMALLINT,TINYINT,DATE,TIME) are currently NOT supported.


## Configuration
All the others (BLOB, INET, UUID, TIMEUUID, LIST, SET, MAP, CUSTOM, UDT, TUPLE, SMALLINT, TINYINT, DATE,TIME) are currently NOT supported.

### Sample Config
## CassandraSink
It stores Kafka SinkRecord in Cassandra tables. Currently, we only support STRUCT type in the SinkRecord.
The STRUCT can have multiple fields with primitive fieldtypes. We assume one-to-one mapping between the column names in the Cassandra sink table and the field names.

#### Sample Source Config
Say, the SinkRecords has the following STRUCT value
```
name=cassandra-source-connector
connector.class=com.tuplejump.kafka.connect.cassandra.CassandraSource
tasks.max=1
cassandra.connection.host=10.0.0.1
cassandra.connection.port=9042
cassandra.source.route.topic1=SELECT * FROM userlog ;
{
'id': 1,
'username': 'user1',
'text': 'This is my first tweet'
}
```

#### Sample Sink Config
```
name=cassandra-source-connector
connector.class=com.tuplejump.kafka.connect.cassandra.CassandraSink
tasks.max=1
Then the Cassandra table should have the columns - id, username, text

cassandra.connection.host=10.0.0.1
cassandra.connection.port=9042
Note: The library does not create the Cassandra tables - users are expected to create those before starting the sink

cassandra.sink.route.topic1=keyspace1.table1
```
## Configuration

Refer `examples/config` for sample configuration files


### Connect Properties (for both Source and Sink)
Expand All @@ -114,7 +127,6 @@ cassandra.sink.route.topic1=keyspace1.table1
| cassandra.source.route.\<topic_name\> | The Select Query to get the data. (Refer CassandraSource documentation for more details) | |
| cassandra.source.poll.interval | Frequency in ms to poll for new data in each table. | 60000 |
| cassandra.source.fetch.size | Number of CQL rows to fetch in a single round-trip to Cassandra. | 1000 |
| cassandra.source.timezone (not yet implemented?) | ?? | true |
### Sink Connect Properties
| name | description | default value |
|-------- |----------------------------|-----------------------|
Expand All @@ -125,10 +137,17 @@ cassandra.sink.route.topic1=keyspace1.table1
## Building from Source
The project requires SBT to build from source. Execute the following command in the project directory,

sbt assembly

```
sbt package
sbt assembly // to generate the jar with all the dependencies
```
This will build against Scala 2.11.7 by default. You can override this with:

sbt -Dscala.version=2.10.6 assembly

This will create an assembly jar which can be added to `lib` directory and used with Kafka.
```
sbt -Dscala.version=2.10.6 assembly
```

Or to build against multiple Scala versions,
```
sbt +package
```
4 changes: 4 additions & 0 deletions examples/config/bulk-source.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
name=cassandra-source-bulk
connector.class=com.tuplejump.kafka.connect.cassandra.CassandraSource
topic=demo
cassandra.source.route.demo=select * from demo.event_store
5 changes: 5 additions & 0 deletions examples/config/sink.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
name=cassandra-sink-connector
connector.class=com.tuplejump.kafka.connect.cassandra.CassandraSink
tasks.max=1
topics=demo
cassandra.sink.route.demo=demo.event_store_sink
40 changes: 40 additions & 0 deletions examples/data.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
website,annual,purchase,1457713158000
iphone,monthly,others,1457713159000
android,trial,purchase,1457713160000
website,monthly,renewal,1457713161000
website,monthly,others,1457713162000
android,monthly,feedback,1457713163000
iphone,annual,enquiry,1457713164000
iphone,monthly,purchase,1457713165000
website,trial,others,1457713166000
android,none,feedback,1457713167000
android,annual,renewal,1457713168000
iphone,none,others,1457713169000
android,annual,purchase,1457713170000
website,trial,renewal,1457713171000
iphone,none,others,1457713172000
android,trial,purchase,1457713173000
iphone,annual,others,1457713174000
android,monthly,renewal,1457713175000
iphone,monthly,renewal,1457713176000
website,trial,feedback,1457713177000
iphone,monthly,renewal,1457713321000
android,annual,feedback,1457713322000
android,none,feedback,1457713323000
website,none,enquiry,1457713324000
website,none,feedback,1457713325000
website,monthly,enquiry,1457713326000
website,trial,enquiry,1457713327000
android,none,renewal,1457713328000
website,monthly,purchase,1457713329000
iphone,annual,purchase,1457713330000
website,annual,others,1457713331000
website,monthly,enquiry,1457713332000
android,trial,purchase,1457713333000
iphone,none,enquiry,1457713334000
iphone,annual,enquiry,1457713335000
website,none,feedback,1457713336000
iphone,trial,enquiry,1457713337000
website,monthly,renewal,1457713338000
android,none,enquiry,1457713339000
android,monthly,purchase,1457713340000
23 changes: 23 additions & 0 deletions examples/setup.cql
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
CREATE KEYSPACE IF NOT EXISTS demo WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 };

USE demo;

CREATE TABLE IF NOT EXISTS demo.event_store(
app_id text,
event_type text,
subscription_type text,
event_ts timestamp,
PRIMARY KEY((app_id, event_type), event_ts)
);

CREATE TABLE IF NOT EXISTS demo.event_store_sink(
app_id text,
event_type text,
subscription_type text,
event_ts timestamp,
PRIMARY KEY((app_id, event_type), event_ts)
);

COPY event_store(app_id,subscription_type,event_type,event_ts) FROM 'data.csv' ;


79 changes: 79 additions & 0 deletions examples/start.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
#!/usr/bin/env bash

DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"

waitForStart(){
until grep -q "$2" $3
do
echo "waiting for "$1" to start"
sleep 5
done
}

if [ ! -f "$KAFKA_HOME" ]
then
if [ ! -f kafka.tgz ]
then
echo "KAFKA_HOME is not configured. Downloading Kafka."
wget "http://archive.apache.org/dist/kafka/0.9.0.1/kafka_2.11-0.9.0.1.tgz" -O kafka.tgz
fi
tar -xzf kafka.tgz
KAFKA_HOME=$DIR/$(tar tfz kafka.tgz --exclude '*/*')
fi

if [ ! -f "$CASSANDRA_HOME" ]
then
if [ ! -f cassandra.tar.gz ]
then
echo "CASSANDRA_HOME is not configured. Downloading Cassandra."
wget "http://archive.apache.org/dist/cassandra/3.1.1/apache-cassandra-3.1.1-bin.tar.gz" -O cassandra.tar.gz
fi
mkdir -p cassandra
tar -xzf cassandra.tar.gz -C cassandra --strip-components=1
CASSANDRA_HOME=cassandra
fi

if [ ! -f "kafka-connect-cassandra-assembly-0.0.7.jar" ]
then
echo "didnt find jar. Downloading"
wget "http://downloads.tuplejump.com/kafka-connect-cassandra-assembly-0.0.7.jar"
fi

cp kafka-connect-cassandra-assembly-0.0.7.jar ${KAFKA_HOME}libs/

##update path of data file
sed -i "s;'data.csv';'${DIR}/data.csv';" "${DIR}/setup.cql"

##start cassandra
cd ${CASSANDRA_HOME}

bin/cassandra -p ${DIR}/'demo.pid' > ${DIR}/cassandraLog

waitForStart "cassandra" "state jump to NORMAL" "${DIR}/cassandraLog"

##setup schema
bin/cqlsh -f "${DIR}/setup.cql"

cd ${KAFKA_HOME}

## start zookeeper
bin/zookeeper-server-start.sh config/zookeeper.properties > ${DIR}/zkLog &
ZK_PID=$!

waitForStart "zookeeper" "binding to port" "${DIR}/zkLog"
echo -n " "${ZK_PID} >> ${DIR}/demo.pid

## start Kafka server
bin/kafka-server-start.sh config/server.properties > ${DIR}/serverLog &
KAFKA_PID=$!

waitForStart "kafka server" "Awaiting socket connections" "${DIR}/serverLog"
echo -n " "${KAFKA_PID} >> ${DIR}/demo.pid

# create topic
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic demo

bin/connect-standalone.sh config/connect-standalone.properties ${DIR}/config/bulk-source.properties ${DIR}/config/sink.properties > ${DIR}/connectLog &
CONNECT_PID=$!

echo -n " "${CONNECT_PID} >> ${DIR}/demo.pid
7 changes: 7 additions & 0 deletions examples/stop.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
#!/usr/bin/env bash

if [ -f "demo.pid" ]
then
kill -9 $(cat demo.pid)
rm demo.pid
fi

0 comments on commit b9c9fe8

Please sign in to comment.