diff --git a/LICENSE.md b/LICENSE.md
index 08c7b2c..277eddb 100644
--- a/LICENSE.md
+++ b/LICENSE.md
@@ -1,4 +1,4 @@
-##License
+##License
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this except in compliance with the License.
diff --git a/com.ibm.streamsx.messaging/.classpath b/com.ibm.streamsx.messaging/.classpath
index 540b7d2..c8253e9 100644
--- a/com.ibm.streamsx.messaging/.classpath
+++ b/com.ibm.streamsx.messaging/.classpath
@@ -6,10 +6,8 @@
-
-
@@ -18,8 +16,10 @@
-
-
+
+
+
+
diff --git a/com.ibm.streamsx.messaging/.gitignore b/com.ibm.streamsx.messaging/.gitignore
index c138ea5..baaee37 100644
--- a/com.ibm.streamsx.messaging/.gitignore
+++ b/com.ibm.streamsx.messaging/.gitignore
@@ -1,6 +1,6 @@
/output
-com.ibm.streamsx.messaging.kafka
impl/lib
+com.ibm.streamsx.messaging.kafka
opt/downloaded
/target
/.externalToolBuilders
diff --git a/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/kafka/AttributeHelper.java b/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/kafka/AttributeHelper.java
index ed6531f..664c697 100644
--- a/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/kafka/AttributeHelper.java
+++ b/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/kafka/AttributeHelper.java
@@ -81,7 +81,8 @@ void setValue(OutputTuple otup, byte[] value) {
otup.setString(name, new String(value, CS));
else
otup.setBlob(name, ValueFactory.newBlob(value));
- }
+ }
+
String getString(Tuple tuple) throws IOException {
if(!isAvailable) return null;
if(isString)
diff --git a/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/kafka/KafkaBaseOper.java b/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/kafka/KafkaBaseOper.java
index 1bf9b6f..729e291 100644
--- a/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/kafka/KafkaBaseOper.java
+++ b/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/kafka/KafkaBaseOper.java
@@ -37,6 +37,7 @@ public abstract class KafkaBaseOper extends AbstractOperator {
messageAH = new AttributeHelper("message");
protected List topics = new ArrayList();
protected KafkaClient client = null;
+ protected SimpleConsumerClient simpleClient = null;
private final Logger trace = Logger.getLogger(KafkaBaseOper.class
.getCanonicalName());
@@ -70,8 +71,15 @@ public void initSchema(StreamSchema ss) throws Exception {
supportedTypes.remove(MetaType.BLOB);
topicAH.initialize(ss, false, supportedTypes);
- trace.log(TraceLevel.INFO, "Creating client");
- client = new KafkaClient(topicAH, keyAH, messageAH, finalProperties);
+
+
+ OperatorContext operContext = getOperatorContext();
+ if (operContext.getParameterNames().contains("partition") == false){
+ //we don't need to create this client if using simpleConsumerClient
+ trace.log(TraceLevel.INFO, "Creating client");
+ client = new KafkaClient(topicAH, keyAH, messageAH, finalProperties);
+ }
+
}
@Parameter(cardinality = -1, optional = true, description = "Specify a Kafka property \\\"key=value\\\" form. "
@@ -94,8 +102,9 @@ public void setPropertiesFile(String value) {
}
public String getPropertiesFile() {
-
- File file = new File(propertiesFile);
+ trace.log(TraceLevel.TRACE, "Properties file: " + propertiesFile);
+ if (propertiesFile == null) return null;
+ File file = new File(propertiesFile);
// if the properties file is relative, the path is relative to the application directory
if (!file.isAbsolute())
@@ -105,12 +114,12 @@ public String getPropertiesFile() {
return propertiesFile;
}
- @Parameter(optional = true, description = "Name of the attribute for the message. This attribute is required. Default is \\\"message\\\"")
+ @Parameter(optional = true, description = "Name of the attribute for the message. This attribute is required. Default is \\\"message\\\".")
public void setMessageAttribute(String value) {
messageAH.setName(value);
}
- @Parameter(optional = true, description = "Name of the attribute for the key. Default is \\\"key\\\"")
+ @Parameter(optional = true, description = "Name of the attribute for the key. Default is \\\"key\\\".")
public void setKeyAttribute(String value) {
keyAH.setName(value);
}
diff --git a/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/kafka/KafkaSource.java b/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/kafka/KafkaSource.java
index abd83a4..f0c2668 100644
--- a/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/kafka/KafkaSource.java
+++ b/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/kafka/KafkaSource.java
@@ -6,6 +6,7 @@
package com.ibm.streamsx.messaging.kafka;
+import java.io.IOException;
import java.util.List;
import java.util.logging.Logger;
@@ -18,17 +19,22 @@
import com.ibm.streams.operator.model.OutputPorts;
import com.ibm.streams.operator.model.Parameter;
import com.ibm.streams.operator.model.PrimitiveOperator;
+import com.ibm.streams.operator.state.Checkpoint;
import com.ibm.streams.operator.state.ConsistentRegionContext;
+import com.ibm.streams.operator.state.StateHandler;
@OutputPorts(@OutputPortSet(cardinality=1, optional=false,
description="Messages received from Kafka are sent on this output port."))
@PrimitiveOperator(name=KafkaSource.OPER_NAME, description=KafkaSource.DESC)
@Icons(location16="icons/KafkaConsumer_16.gif", location32="icons/KafkaConsumer_32.gif")
-public class KafkaSource extends KafkaBaseOper {
+public class KafkaSource extends KafkaBaseOper implements StateHandler{
static final String OPER_NAME = "KafkaConsumer";
private int threadsPerTopic = 1;
-
+ private int a_partition = -1;
+ private int triggerCount = -1;
+ private int leaderConnectionRetries = 3;
+ private int connectionRetryInterval = 1000;
private static Logger trace = Logger.getLogger(KafkaSource.class.getName());
//consistent region checks
@@ -36,28 +42,79 @@ public class KafkaSource extends KafkaBaseOper {
public static void checkInConsistentRegion(OperatorContextChecker checker) {
ConsistentRegionContext consistentRegionContext =
checker.getOperatorContext().getOptionalContext(ConsistentRegionContext.class);
-
+ OperatorContext operContext = checker.getOperatorContext();
+
if(consistentRegionContext != null ) {
- checker.setInvalidContext( OPER_NAME + " operator cannot be used inside a consistent region.",
- new String[] {});
+ //checker.setInvalidContext( OPER_NAME + " operator cannot be used inside a consistent region.",
+ // new String[] {});
+ if (!operContext.getParameterNames().contains("partition")){
+ checker.setInvalidContext("The partition parameter must be specified in consistent regions.", new String[] {});
+ }
}
}
-
+
+ //simple consumer client checks
+ @ContextCheck(runtime = false, compile = true)
+ public static void checkCompileCompatability(OperatorContextChecker checker) {
+ OperatorContext operContext = checker.getOperatorContext();
+
+ if (!operContext.getParameterNames().contains("propertiesFile")
+ && !operContext.getParameterNames().contains("kafkaProperty")){
+ checker.setInvalidContext("Missing properties: Neither propertiesFile nor kafkaProperty parameters are set. At least one must be set.",new String[] {});
+
+ }
+
+ }
+
+ //simple consumer client checks
+ @ContextCheck(runtime = true, compile = false)
+ public static void checkRuntimeCompatability(OperatorContextChecker checker) {
+ OperatorContext operContext = checker.getOperatorContext();
+
+ if (operContext.getParameterNames().contains("partition")){
+
+ if (operContext.getParameterValues("topic").size() > 1){
+ checker.setInvalidContext("Invalid topic parameter: Only one topic can be specified when the partition parameter is set.", new String[] {});
+ throw new IllegalArgumentException("Invalid topic parameter: Only one topic can be specified when the partition parameter is set.");
+ }
+
+ }
+
+ }
+
@Override
public void initialize(OperatorContext context)
throws Exception {
super.initialize(context);
super.initSchema(getOutput(0).getStreamSchema());
-
+
+
+ getOutput(0);
if(threadsPerTopic < 1)
- throw new IllegalArgumentException("Number of threads per topic cannot be less than one: " + threadsPerTopic);
+ throw new IllegalArgumentException("Number of threads per topic cannot be less than one: " + threadsPerTopic);
+ ConsistentRegionContext crContext = getOperatorContext().getOptionalContext(ConsistentRegionContext.class);
+ if( crContext != null){
+
+ if (a_partition == -1){
+ throw new IllegalArgumentException("Partition parameter must be specified when using consistent region");
+ }
+
+ }
}
@Override
public void allPortsReady() throws Exception {
//initialize the client
- trace.log(TraceLevel.INFO, "Initializing client");
- client.initConsumer(getOutput(0), getOperatorContext().getThreadFactory(), topics, threadsPerTopic);
+ trace.log(TraceLevel.INFO, "Initializing client");
+ if(a_partition >= 0){
+ trace.log(TraceLevel.INFO, "Using simple consumer client.");
+ simpleClient = new SimpleConsumerClient(topics.get(0), a_partition, keyAH, messageAH, finalProperties, triggerCount, leaderConnectionRetries, connectionRetryInterval);
+ simpleClient.initialize(getOperatorContext());
+ simpleClient.allPortsReady();
+ } else {
+ trace.log(TraceLevel.INFO, "Using high level consumer client.");
+ client.initConsumer(getOutput(0), getOperatorContext().getThreadFactory(), topics, threadsPerTopic);
+ }
}
@Parameter(name="threadsPerTopic", optional=true,
@@ -70,15 +127,73 @@ public void setThreadsPerTopic(int value) {
public void setTopic(List values) {
if(values!=null)
topics.addAll(values);
- }
+ }
+
+ @Parameter(name="partition", optional=true,
+ description="Partition to subscribe to. This must be set when using consistent region.")
+ public void setPartition(int value) {
+ this.a_partition = value;
+ }
+
+ @Parameter(name="triggerCount", optional=true,
+ description="Number of messages between checkpointing for consistent region. This is only relevant to operator driven checkpointing.")
+ public void setTriggerCount(int value) {
+ this.triggerCount = value;
+ }
+
+ @Parameter(name="leaderConnectionRetries", optional=true,
+ description="Number of attempts at finding a Kafka Broker before giving up. This is relevant when first looking for a broker, and in the case that a lead broker host goes down. This is only valid when the partition parameter is set. Default is 3.")
+ public void setLeaderConnectionRetries(int value) {
+ this.leaderConnectionRetries = value;
+ }
+
+ @Parameter(name="connectionRetryInterval", optional=true,
+ description="Interval between each attempt to find a lead Kafka Broker in milliseconds. Number of attempts is set by the leaderConnectionRetries parameter. This is only valid when the partition parameter is set. Default is 1000 ms.")
+ public void setConnectionRetryInterval(int value) {
+ this.connectionRetryInterval = value;
+ }
public static final String DESC =
- "This operator acts as a Kafka consumer receiving messages for one or more topics. " +
+ "This operator acts as a Kafka consumer receiving messages for a single topic. " +
"Note that there may be multiple threads receiving messages depending on the configuration specified. " +
"Ordering of messages is not guaranteed." +
"\\n\\n**Behavior in a Consistent Region**" +
- "\\nThis operator cannot be used inside a consistent region."
- ;
+ "\\nThis operator can be used inside a consistent region. Operator driven and periodical checkpointing " +
+ "are supported. Partition to be read from must be specified. To consume multiple partitions in a topic, use " +
+ "user defined parallelism or multiple consumers."
+ ;
+
+ @Override
+ public void close() throws IOException {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void checkpoint(Checkpoint checkpoint) throws Exception {
+ simpleClient.checkpoint(checkpoint);
+ }
+
+ @Override
+ public void drain() throws Exception {
+ simpleClient.drain();
+
+ }
+
+ @Override
+ public void reset(Checkpoint checkpoint) throws Exception {
+ simpleClient.reset(checkpoint);
+ }
+
+ @Override
+ public void resetToInitialState() throws Exception {
+ simpleClient.resetToInitialState();
+ }
+
+ @Override
+ public void retireCheckpoint(long id) throws Exception {
+ simpleClient.retireCheckpoint(id);
+ }
}
diff --git a/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/kafka/SimpleConsumerClient.java b/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/kafka/SimpleConsumerClient.java
new file mode 100644
index 0000000..0d282fe
--- /dev/null
+++ b/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/kafka/SimpleConsumerClient.java
@@ -0,0 +1,507 @@
+/* Generated by Streams Studio: May 26, 2015 1:20:31 PM EDT */
+package com.ibm.streamsx.messaging.kafka;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import kafka.api.FetchRequest;
+import kafka.api.FetchRequestBuilder;
+import kafka.api.PartitionOffsetRequestInfo;
+import kafka.cluster.Broker;
+import kafka.common.ErrorMapping;
+import kafka.common.TopicAndPartition;
+import kafka.javaapi.FetchResponse;
+import kafka.javaapi.OffsetResponse;
+import kafka.javaapi.consumer.SimpleConsumer;
+import kafka.message.MessageAndOffset;
+import kafka.utils.ZKStringSerializer$;
+import kafka.utils.ZkUtils;
+
+import org.I0Itec.zkclient.ZkClient;
+import org.apache.log4j.Logger;
+
+import scala.actors.threadpool.Arrays;
+
+import com.ibm.streams.operator.OperatorContext;
+import com.ibm.streams.operator.OutputTuple;
+import com.ibm.streams.operator.StreamingOutput;
+import com.ibm.streams.operator.log4j.TraceLevel;
+import com.ibm.streams.operator.state.Checkpoint;
+import com.ibm.streams.operator.state.ConsistentRegionContext;
+import com.ibm.streams.operator.state.StateHandler;
+
+public class SimpleConsumerClient implements StateHandler {
+
+ private static Logger TRACE = Logger.getLogger(SimpleConsumerClient.class
+ .getCanonicalName());
+
+ private Thread processThread;
+ static final String OPER_NAME = "KafkaConsumer";
+ private ConsistentRegionContext crContext;
+ private boolean shutdown = false;
+ private int triggerCount;
+ private long triggerIteration = 0;
+ private OperatorContext operContext;
+ private boolean inReset = false;
+ // consumer variables
+ public SimpleConsumer myConsumer = null;
+ private ZkClient zkClient;
+ private String topic;
+ private Broker leadBroker;
+ private long readOffset;
+ private int partition;
+ private int so_timeout;
+ private int bufferSize;
+ private int fetchSize;
+ private String clientName;
+ protected Properties finalProperties = new Properties();
+ private String charSet = "UTF-8";
+ private AttributeHelper keyAH;
+ private AttributeHelper messageAH;
+ private int leaderConnectionRetries = 3;
+ private int connectionRetryInterval = 1000;
+ final long initialOffsetTime = kafka.api.OffsetRequest.LatestTime();
+
+ private static final int DEFAULT_BUFFER_SIZE = 64 * 1024;
+ private static final int DEFAULT_FETCH_SIZE = 100 * 1024;
+ private static final int DEFAULT_SO_TIMEOUT = 30 * 1000;
+
+ public SimpleConsumerClient(String a_topic, int a_partition,
+ AttributeHelper keyAH,
+ AttributeHelper messageAH, Properties props, int trigCnt,
+ int connectionRetries, int connectionRetryInterval) {
+ this.topic = a_topic;
+ this.keyAH = keyAH;
+ this.messageAH = messageAH;
+ this.partition = a_partition;
+ this.finalProperties = props;
+ this.triggerCount = trigCnt;
+ this.leaderConnectionRetries = connectionRetries;
+ this.connectionRetryInterval = connectionRetryInterval;
+ }
+
+ /**
+ * Notification that initialization is complete and all input and output
+ * ports are connected and ready to receive and submit tuples.
+ *
+ * @throws Exception
+ * Operator failure, will cause the enclosing PE to terminate.
+ */
+ public synchronized void allPortsReady() throws Exception {
+ Logger.getLogger(this.getClass()).trace(
+ "Operator " + operContext.getName()
+ + " all ports are ready in PE: "
+ + operContext.getPE().getPEId() + " in Job: "
+ + operContext.getPE().getJobId());
+ // Start a thread for producing tuples because operator
+ // implementations must not block and must return control to the caller.
+ processThread.start();
+ }
+
+ @Override
+ public void checkpoint(Checkpoint checkpoint) throws Exception {
+ checkpoint.getOutputStream().writeLong(readOffset);
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (TRACE.isInfoEnabled())
+ TRACE.log(TraceLevel.INFO, "StateHandler close");
+ }
+
+ @Override
+ public void drain() throws Exception {
+ }
+
+ private Broker findLeader(ZkClient a_zkClient, String a_topic,
+ int a_partition) {
+ int leaderID;
+ Broker a_leadBroker = null;
+
+ try {
+
+ leaderID = ZkUtils.getLeaderForPartition(zkClient, a_topic,
+ a_partition).get();
+ a_leadBroker = ZkUtils.getBrokerInfo(zkClient, leaderID).get();
+
+ } catch (Exception e) {
+ TRACE.log(TraceLevel.ERROR, "Exception from ZkClient. ", e);
+ }
+
+ return a_leadBroker;
+ }
+
+ private Broker findNewLeader(Broker oldBroker, ZkClient a_zkClient,
+ String a_topic, int a_partition, int numLeaderTries, int sleepBetweenTries) throws IOException{
+
+ for (int i = 0; i < numLeaderTries; i++) {
+ boolean goSleep = false;
+ Broker newLeadBroker = findLeader(a_zkClient, a_topic, a_partition);
+
+ if (newLeadBroker == null) {
+ goSleep = true;
+ } else if (oldBroker != null
+ && newLeadBroker.host().equalsIgnoreCase(oldBroker.host())
+ && i == 0) {
+ // if it's the first time, let's give zookeeper a chance to
+ // recover.
+ goSleep = true;
+ } else {
+ return newLeadBroker;
+ }
+
+ if (goSleep) {
+ try {
+ if(TRACE.isInfoEnabled())
+ TRACE.log(TraceLevel.INFO, "Sleeping after attempt " + (i+1) + ".");
+ Thread.sleep(sleepBetweenTries);
+ } catch (InterruptedException ie) {
+ }
+ }
+ }
+
+ throw new IOException("Unable to find a new Kafka lead Broker after " + numLeaderTries + " attempts. Unable to proceed.");
+ }
+
+ private ZkClient getInitializedZkClient() throws Exception{
+ ZkClient localZkClient;
+ String zkConnect = finalProperties.getProperty("zookeeper.connect");
+ int zkSessionTimeout = getIntegerProperty(
+ "zookeeper.session.timeout.ms", 400);
+ int zkConnectionTimeout = getIntegerProperty("zookeeper.sync.time.ms",
+ 200);
+
+ if (TRACE.isInfoEnabled())
+ TRACE.log(TraceLevel.INFO,
+ "Initializing ZooKeeper with values: zkConnect("
+ + zkConnect + ") zkSessionTimeout("
+ + zkSessionTimeout + ") zkConnectionTimeout("
+ + zkConnectionTimeout + ")");
+
+ try {
+ localZkClient = new ZkClient(zkConnect, zkSessionTimeout,
+ zkConnectionTimeout, ZKStringSerializer$.MODULE$);
+ } catch (Exception e) {
+ TRACE.log(TraceLevel.ERROR,
+ "Zookeeper client did not initialize correctly with exception: "
+ + e);
+ throw e;
+ }
+
+ return localZkClient;
+ }
+
+ private int getIntegerProperty(String propName, int defaultVal) {
+ int integerProp = defaultVal;
+ String propVal = finalProperties
+ .getProperty(propName);
+
+ if (propVal != null){
+ try {
+ integerProp = Integer.parseInt(finalProperties
+ .getProperty(propName));
+ } catch (Exception e){
+
+ TRACE.log(TraceLevel.ERROR, "Property " + propName + " was not input as type int.", e);
+ }
+ }
+ TRACE.log(TraceLevel.ERROR, "Property " + propName + " has a final value of " + integerProp);
+ return integerProp;
+ }
+
+ public static long getLastOffset(SimpleConsumer consumer, String topic,
+ int partition, long whichTime, String clientName) {
+ TopicAndPartition topicAndPartition = new TopicAndPartition(topic,
+ partition);
+ Map requestInfo = new HashMap();
+ requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(
+ whichTime, 1));
+ kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(
+ requestInfo, kafka.api.OffsetRequest.CurrentVersion(),
+ clientName);
+ OffsetResponse response = consumer.getOffsetsBefore(request);
+
+ if (response.hasError()) {
+ TRACE.log(TraceLevel.ERROR,
+ "Error fetching data Offset Data the Broker. Reason: "
+ + response.errorCode(topic, partition));
+ return 0;
+ }
+ long[] offsets = response.offsets(topic, partition);
+ TRACE.log(TraceLevel.TRACE,
+ "Retrieving offsets: " + Arrays.toString(offsets));
+ return offsets[0];
+ }
+
+ private void handleFetchError(FetchResponse fetchResponse) throws IOException {
+ short code = fetchResponse.errorCode(topic, partition);
+ TRACE.log(
+ TraceLevel.ERROR,
+ "Error fetching data from the Broker:"
+ + leadBroker.host() + " Reason: " + code);
+
+ if (code == ErrorMapping.OffsetOutOfRangeCode()) {
+ // We asked for an invalid offset. This should never
+ // happen.
+ TRACE.log(TraceLevel.ERROR,
+ "Tried to request an invalid offset. Exiting.");
+ }
+ myConsumer.close();
+ myConsumer = null;
+ leadBroker = findNewLeader(null, zkClient, topic, partition, leaderConnectionRetries, connectionRetryInterval);
+ }
+
+ public void initialize(OperatorContext context) throws Exception {
+
+ Logger.getLogger(this.getClass()).trace(
+ "Operator " + context.getName() + " initializing in PE: "
+ + context.getPE().getPEId() + " in Job: "
+ + context.getPE().getJobId());
+ operContext = context;
+ crContext = context.getOptionalContext(ConsistentRegionContext.class);
+ TRACE.log(TraceLevel.TRACE, "Beginning consumer initialization");
+
+ // name client
+ clientName = topic + "_partition_" + Integer.toString(partition)
+ + "_pe_" + context.getPE().getPEId();
+ TRACE.log(TraceLevel.TRACE, "Initializing consumer with clientName: "
+ + clientName);
+
+ so_timeout = getIntegerProperty("socket.timeout.ms", DEFAULT_SO_TIMEOUT);
+ bufferSize = getIntegerProperty("simpleConsumer.bufferSize.bytes", DEFAULT_BUFFER_SIZE);
+ fetchSize = getIntegerProperty("simpleConsumer.fetchSize.bytes", DEFAULT_FETCH_SIZE);
+
+
+ zkClient = getInitializedZkClient();
+ leadBroker = findNewLeader(null, zkClient, topic, partition, leaderConnectionRetries, connectionRetryInterval);
+
+ if (leadBroker == null) {
+ TRACE.log(TraceLevel.ERROR,
+ "Can't find Leader for Topic and Partition. Exiting.");
+ shutdown();
+ return;
+ }
+
+ if (!keyAH.isAvailable()){
+ throw new Exception("Specified output attribute " + keyAH.getName() + " is not available.");
+ }
+
+ if (!messageAH.isAvailable()){
+ throw new Exception("Specified output attribute " + messageAH.getName() + " is not available.");
+ }
+
+
+ myConsumer = new SimpleConsumer(leadBroker.host(), leadBroker.port(),
+ so_timeout, bufferSize, clientName);
+
+ if (TRACE.isInfoEnabled())
+ TRACE.log(TraceLevel.INFO,
+ "Initializing SimpleConsumer with values: leadBroker("
+ + leadBroker.host() + ":" + leadBroker.port()
+ + ") socket timeout(" + so_timeout
+ + ") bufferSize(" + bufferSize + ") clientName(" + clientName + ")");
+
+
+
+ readOffset = getLastOffset(myConsumer, topic, partition,
+ initialOffsetTime, clientName);
+
+
+ /*
+ * Create the thread for producing tuples. The thread is created at
+ * initialize time but started. The thread will be started by
+ * allPortsReady().
+ */
+ processThread = context.getThreadFactory().newThread(new Runnable() {
+
+ @Override
+ public void run() {
+ try {
+ produceTuples();
+ } catch (Exception e) {
+ TRACE.log(TraceLevel.ERROR, "Operator error: " + e.getMessage() + "\n" + e.getStackTrace());
+ Logger.getLogger(this.getClass())
+ .error("Operator error", e); //$NON-NLS-1$
+ }
+ }
+
+ });
+
+ /*
+ * Set the thread not to be a daemon to ensure that the SPL runtime will
+ * wait for the thread to complete before determining the operator is
+ * complete.
+ */
+ processThread.setDaemon(false);
+ }
+
+ /**
+ * Submit new tuples to the output stream
+ *
+ * @throws Exception
+ * if an error occurs while submitting a tuple
+ */
+ private void produceTuples() throws UnsupportedEncodingException,IOException {
+ final StreamingOutput out = operContext
+ .getStreamingOutputs().get(0);
+ OutputTuple tuple = out.newTuple();
+ long numRead;
+ long currentOffset;
+ FetchResponse fetchResponse;
+
+ while (!shutdown) {
+
+
+ if (myConsumer == null) {
+ myConsumer = new SimpleConsumer(leadBroker.host(),
+ leadBroker.port(), so_timeout, bufferSize, clientName);
+ }
+
+ FetchRequest req = new FetchRequestBuilder().clientId(clientName)
+ .addFetch(topic, partition, readOffset, fetchSize).build();
+
+ try {
+ fetchResponse = myConsumer.fetch(req);
+ } catch (Exception e) {
+ TRACE.log(TraceLevel.ERROR,
+ "Fetch error. Lead server cannot be contacted. Exception: "
+ + e.getStackTrace());
+ myConsumer.close();
+ myConsumer = null;
+ fetchResponse = null;
+ leadBroker = findNewLeader(leadBroker, zkClient, topic,
+ partition, leaderConnectionRetries, connectionRetryInterval);
+ }
+
+ if (fetchResponse != null) {
+
+ if (fetchResponse.hasError()) {
+ // Something went wrong!
+ handleFetchError(fetchResponse);
+ } else {
+ numRead = 0;
+ for (MessageAndOffset messageAndOffset : fetchResponse
+ .messageSet(topic, partition)) {
+ try {
+ if (crContext != null) {
+ crContext.acquirePermit();
+ }
+ // if there has been a reset, we NEED to get out of
+ // this loop and do a new fetch request
+ if (inReset) {
+ inReset = false;
+ break;
+ }
+
+ currentOffset = messageAndOffset.offset();
+ tuple = out.newTuple();
+
+ if (currentOffset < readOffset) {
+ TRACE.log(TraceLevel.ERROR,
+ "Found an old offset: " + currentOffset
+ + " Expecting: " + readOffset);
+ }
+
+ ByteBuffer messagePayload = messageAndOffset
+ .message().payload();
+
+ byte[] messageBytes = new byte[messagePayload
+ .limit()];
+ messagePayload.get(messageBytes);
+ String message = new String(messageBytes, charSet);
+
+ ByteBuffer keyPayload = messageAndOffset.message()
+ .key();
+ byte[] keyBytes = new byte[keyPayload.limit()];
+ keyPayload.get(keyBytes);
+ String key = new String(keyBytes, charSet);
+
+ // Set attributes in tuple:
+ tuple.setString(messageAH.getName(), message);
+ tuple.setString(keyAH.getName(), key);
+
+ numRead++;
+ // Submit tuple to output stream
+ out.submit(tuple);
+ readOffset = messageAndOffset.nextOffset();
+
+ if (crContext != null
+ && crContext.isTriggerOperator()) {
+ triggerIteration++;
+ if (triggerIteration >= triggerCount) {
+ crContext.makeConsistent();
+ triggerIteration = 0;
+ }
+ }
+ } catch (Exception e) {
+ TRACE.log(TraceLevel.ERROR,
+ "Unexpected exception: " + e.toString());
+ } finally {
+ // release permit when done submitting
+ if (crContext != null) {
+ crContext.releasePermit();
+ }
+ }
+ }
+ if (numRead == 0) {
+ try {
+ Thread.sleep(50);
+ } catch (InterruptedException ie) {
+ TRACE.log(TraceLevel.ERROR,
+ "Exception while sleeping: " + ie);
+ }
+ }
+
+ }
+
+ }
+ }
+ if (myConsumer != null)
+ myConsumer.close();
+ }
+
+ @Override
+ public void reset(Checkpoint checkpoint) throws Exception {
+ if (TRACE.isInfoEnabled())
+ TRACE.log(TraceLevel.INFO,
+ "Reset to checkpoint " + checkpoint.getSequenceId());
+ readOffset = checkpoint.getInputStream().readLong();
+ inReset = true;
+ }
+
+ @Override
+ public void resetToInitialState() throws Exception {
+ if (TRACE.isInfoEnabled())
+ TRACE.log(TraceLevel.INFO, "Reset to initial state");
+ }
+
+ @Override
+ public void retireCheckpoint(long id) throws Exception {
+ }
+
+ /**
+ * Shutdown this operator, which will interrupt the thread executing the
+ * produceTuples()
method.
+ *
+ * @throws Exception
+ * Operator failure, will cause the enclosing PE to terminate.
+ */
+ public synchronized void shutdown() throws Exception {
+ shutdown = true;
+ if (processThread != null) {
+ processThread.interrupt();
+ processThread = null;
+ }
+
+ Logger.getLogger(this.getClass()).trace(
+ "Operator " + operContext.getName() + " shutting down in PE: "
+ + operContext.getPE().getPEId() + " in Job: "
+ + operContext.getPE().getJobId());
+
+ }
+
+}
diff --git a/samples/KafkaConsistentRegionConsumerParallel/.classpath b/samples/KafkaConsistentRegionConsumerParallel/.classpath
new file mode 100644
index 0000000..36e3d95
--- /dev/null
+++ b/samples/KafkaConsistentRegionConsumerParallel/.classpath
@@ -0,0 +1,11 @@
+
+
+
+
+
+
+
+
+
+
+
diff --git a/samples/KafkaConsistentRegionConsumerParallel/.project b/samples/KafkaConsistentRegionConsumerParallel/.project
new file mode 100644
index 0000000..a7fbe31
--- /dev/null
+++ b/samples/KafkaConsistentRegionConsumerParallel/.project
@@ -0,0 +1,29 @@
+
+
+ KafkaConsistentRegionConsumerParallel
+
+
+
+
+
+ org.eclipse.jdt.core.javabuilder
+
+
+
+
+ com.ibm.streams.studio.splproject.builder.SPLProjectBuilder
+
+
+
+
+ org.eclipse.xtext.ui.shared.xtextBuilder
+
+
+
+
+
+ com.ibm.streams.studio.splproject.SPLProjectNature
+ org.eclipse.xtext.ui.shared.xtextNature
+ org.eclipse.jdt.core.javanature
+
+
diff --git a/samples/KafkaConsistentRegionConsumerParallel/Makefile b/samples/KafkaConsistentRegionConsumerParallel/Makefile
new file mode 100644
index 0000000..0624e30
--- /dev/null
+++ b/samples/KafkaConsistentRegionConsumerParallel/Makefile
@@ -0,0 +1,31 @@
+# Copyright (C) 2015, International Business Machines Corporation
+# All Rights Reserved
+
+.PHONY: all clean
+
+TOOLKIT_NAME=com.ibm.streamsx.messaging
+STREAMS_MESSAGING_TOOLKIT ?= $(shell ([ -e ../../$(TOOLKIT_NAME)/toolkit.xml ] && echo ../../$(TOOLKIT_NAME)) ||\
+ ([ -e "../$(TOOLKIT_NAME)" ] && echo ../$(TOOLKIT_NAME)) ||\
+ echo $(STREAMS_INSTALL)/toolkits/$(TOOLKIT_NAME))
+
+
+SPLC_FLAGS ?= -a --data-directory data
+SPLC = $(STREAMS_INSTALL)/bin/sc
+
+SPL_CMD_ARGS ?= -t $(STREAMS_MESSAGING_TOOLKIT)
+SPL_MAIN_COMPOSITE = application::ConsistentRegionConsumerParallel
+
+all: distributed
+
+data:
+ mkdir data
+
+standalone: data
+ $(SPLC) $(SPLC_FLAGS) -T -M $(SPL_MAIN_COMPOSITE) $(SPL_CMD_ARGS)
+
+distributed: data
+ $(SPLC) $(SPLC_FLAGS) -M $(SPL_MAIN_COMPOSITE) $(SPL_CMD_ARGS)
+
+clean:
+ $(SPLC) $(SPLC_FLAGS) -C -M $(SPL_MAIN_COMPOSITE)
+ rm data/*.out
\ No newline at end of file
diff --git a/samples/KafkaConsistentRegionConsumerParallel/application/.namespace b/samples/KafkaConsistentRegionConsumerParallel/application/.namespace
new file mode 100644
index 0000000..e69de29
diff --git a/samples/KafkaConsistentRegionConsumerParallel/application/ConsistentRegionConsumerParallel.spl b/samples/KafkaConsistentRegionConsumerParallel/application/ConsistentRegionConsumerParallel.spl
new file mode 100644
index 0000000..af88690
--- /dev/null
+++ b/samples/KafkaConsistentRegionConsumerParallel/application/ConsistentRegionConsumerParallel.spl
@@ -0,0 +1,77 @@
+namespace application ;
+
+use com.ibm.streamsx.messaging.kafka::* ;
+/**
+ * Read from a three-partition Kafka topic using the KafkaConsumer operator
+ * in a consistent region (guaranteed tuple processing). This sample contains
+ * two consistent regions. The first includes the beacon and the KafkaProducer,
+ * the second includes the three parallel KafkaConsumers and the MessagePrinter.
+ * Kafka only guarantees ordering of tuples within a single partition. This application
+ * provided the same guarantee, but since we are reading from three separate partitions,
+ * we can lose order. Depending on the key/message, order can be recovered after consuming
+ * from a Kafka Server.
+ *
+ * Make sure you have created your topic before launching:
+ * bin/kafka-topics.sh --create --zookeeper :2181 --partitions 3 --topic myParallelTopic
+ *
+ * Edit the consumer.properties and producer.properties files found in the etc directory to include
+ * your Kafka properties.
+ *
+ * Build using Studio or the provided Makefile.
+ *
+ * Check results by looking at messagesReceived.out in the data directory.
+ *
+ * Consistent Region does not support Standalone mode, so this sample is only interesting in
+ * Distributed mode.
+ *
+ *
+ */
+composite ConsistentRegionConsumerParallel
+{
+ graph
+ //generate data to be written to a kafka server
+ @consistent(trigger = operatorDriven)
+ stream OutputStream = Beacon()
+ {
+ param
+ period : 0.25 ;
+ initDelay : 4.0 ;
+ triggerCount : 20u ;
+ output
+ OutputStream : topic = "myParallelTopic", message =(rstring)
+ IterationCount(), key =(rstring)(int32)(random() * 10.0) ;
+ }
+
+ //Write to Kafka Server
+ () as KafkaSinkOp = KafkaProducer(OutputStream)
+ {
+ param
+ propertiesFile : "etc/producer.properties" ;
+ }
+
+ //Read in from a kafka server and start consistent region
+ @parallel(width = 3) @consistent(trigger = periodic, period = 5.0)
+ stream KafkaConsumerOut = KafkaConsumer()
+ {
+ param
+ propertiesFile : "etc/consumer.properties" ;
+ topic : "myParallelTopic" ;
+ partition : getChannel() ;
+ }
+
+
+ //Print out data to a file
+ () as MessagePrinter = FileSink(KafkaConsumerOut)
+ {
+ param
+ file : "messagesReceived.out" ;
+ flush : 1u ;
+ format : csv ;
+ }
+
+ () as JCP = JobControlPlane()
+ {
+ }
+
+ }
+
diff --git a/samples/KafkaConsistentRegionConsumerParallel/etc/consumer.properties b/samples/KafkaConsistentRegionConsumerParallel/etc/consumer.properties
new file mode 100644
index 0000000..fee372e
--- /dev/null
+++ b/samples/KafkaConsistentRegionConsumerParallel/etc/consumer.properties
@@ -0,0 +1,6 @@
+zookeeper.connect=zk.host.1:2181,zk.host.2:2181,zk.host.3:2181
+serializer.class=kafka.serializer.StringEncoder
+group.id=mygroup
+zookeeper.session.timeout.ms=4000
+zookeeper.sync.time.ms=2000
+auto.commit.interval.ms=1000
\ No newline at end of file
diff --git a/samples/KafkaConsistentRegionConsumerParallel/etc/producer.properties b/samples/KafkaConsistentRegionConsumerParallel/etc/producer.properties
new file mode 100644
index 0000000..ebf7396
--- /dev/null
+++ b/samples/KafkaConsistentRegionConsumerParallel/etc/producer.properties
@@ -0,0 +1,3 @@
+metadata.broker.list=zk.host.1:2181,zk.host.2:2181,zk.host.3:2181
+serializer.class=kafka.serializer.StringEncoder
+request.required.acks=1
\ No newline at end of file
diff --git a/samples/KafkaConsistentRegionConsumerParallel/info.xml b/samples/KafkaConsistentRegionConsumerParallel/info.xml
new file mode 100644
index 0000000..5cc8e9b
--- /dev/null
+++ b/samples/KafkaConsistentRegionConsumerParallel/info.xml
@@ -0,0 +1,15 @@
+
+
+
+ ConsistentRegionConsumerParallel
+
+ 1.0.0
+ 4.0.1.0
+
+
+
+ com.ibm.streamsx.messaging
+ [2.0.2,3.0.0)
+
+
+
\ No newline at end of file
diff --git a/samples/KafkaConsistentRegionConsumerSimple/.classpath b/samples/KafkaConsistentRegionConsumerSimple/.classpath
new file mode 100644
index 0000000..36e3d95
--- /dev/null
+++ b/samples/KafkaConsistentRegionConsumerSimple/.classpath
@@ -0,0 +1,11 @@
+
+
+
+
+
+
+
+
+
+
+
diff --git a/samples/KafkaConsistentRegionConsumerSimple/.gitignore b/samples/KafkaConsistentRegionConsumerSimple/.gitignore
new file mode 100644
index 0000000..01afdcf
--- /dev/null
+++ b/samples/KafkaConsistentRegionConsumerSimple/.gitignore
@@ -0,0 +1 @@
+/impl/
diff --git a/samples/KafkaConsistentRegionConsumerSimple/.project b/samples/KafkaConsistentRegionConsumerSimple/.project
new file mode 100644
index 0000000..57bb480
--- /dev/null
+++ b/samples/KafkaConsistentRegionConsumerSimple/.project
@@ -0,0 +1,29 @@
+
+
+ KafkaConsistentRegionConsumerSimple
+
+
+
+
+
+ org.eclipse.jdt.core.javabuilder
+
+
+
+
+ com.ibm.streams.studio.splproject.builder.SPLProjectBuilder
+
+
+
+
+ org.eclipse.xtext.ui.shared.xtextBuilder
+
+
+
+
+
+ com.ibm.streams.studio.splproject.SPLProjectNature
+ org.eclipse.xtext.ui.shared.xtextNature
+ org.eclipse.jdt.core.javanature
+
+
diff --git a/samples/KafkaConsistentRegionConsumerSimple/Makefile b/samples/KafkaConsistentRegionConsumerSimple/Makefile
new file mode 100644
index 0000000..8e5b0d2
--- /dev/null
+++ b/samples/KafkaConsistentRegionConsumerSimple/Makefile
@@ -0,0 +1,31 @@
+# Copyright (C) 2015, International Business Machines Corporation
+# All Rights Reserved
+
+.PHONY: all clean
+
+TOOLKIT_NAME=com.ibm.streamsx.messaging
+STREAMS_MESSAGING_TOOLKIT ?= $(shell ([ -e ../../$(TOOLKIT_NAME)/toolkit.xml ] && echo ../../$(TOOLKIT_NAME)) ||\
+ ([ -e "../$(TOOLKIT_NAME)" ] && echo ../$(TOOLKIT_NAME)) ||\
+ echo $(STREAMS_INSTALL)/toolkits/$(TOOLKIT_NAME))
+
+
+SPLC_FLAGS ?= -a --data-directory data
+SPLC = $(STREAMS_INSTALL)/bin/sc
+
+SPL_CMD_ARGS ?= -t $(STREAMS_MESSAGING_TOOLKIT)
+SPL_MAIN_COMPOSITE = application::ConsistentRegionConsumerSimple
+
+all: distributed
+
+data:
+ mkdir data
+
+standalone: data
+ $(SPLC) $(SPLC_FLAGS) -T -M $(SPL_MAIN_COMPOSITE) $(SPL_CMD_ARGS)
+
+distributed: data
+ $(SPLC) $(SPLC_FLAGS) -M $(SPL_MAIN_COMPOSITE) $(SPL_CMD_ARGS)
+
+clean:
+ $(SPLC) $(SPLC_FLAGS) -C -M $(SPL_MAIN_COMPOSITE)
+ rm data/*.out
\ No newline at end of file
diff --git a/samples/KafkaConsistentRegionConsumerSimple/application/.namespace b/samples/KafkaConsistentRegionConsumerSimple/application/.namespace
new file mode 100644
index 0000000..e69de29
diff --git a/samples/KafkaConsistentRegionConsumerSimple/application/ConsistentRegionConsumerSimple.spl b/samples/KafkaConsistentRegionConsumerSimple/application/ConsistentRegionConsumerSimple.spl
new file mode 100644
index 0000000..62938a6
--- /dev/null
+++ b/samples/KafkaConsistentRegionConsumerSimple/application/ConsistentRegionConsumerSimple.spl
@@ -0,0 +1,72 @@
+namespace application ;
+
+use com.ibm.streamsx.messaging.kafka::* ;
+/**
+ * Read from a single-partition Kafka topic using the KafkaConsumer operator
+ * in a consistent region (guaranteed tuple processing). This sample contains
+ * two consistent regions. The first includes the beacon and the KafkaProducer,
+ * the second includes the KafkaConsumer and the MessagePrinter.
+ *
+ * Make sure you have created your topic before launching:
+ * bin/kafka-topics.sh --create --zookeeper :2181 --partitions 1 --topic mySimpleTopic
+ *
+ * Edit the consumer.properties and producer.properties files found in the etc directory to include
+ * your Kafka properties.
+ *
+ * Build using Studio or the provided Makefile.
+ *
+ * Check results by looking at messagesReceived.out in the data directory.
+ *
+ * Consistent Region does not support Standalone mode, so this sample is only interesting in
+ * Distributed mode.
+ *
+ *
+ */
+composite ConsistentRegionConsumerSimple
+{
+ graph
+ //generate data to be written to a kafka server
+ @consistent(trigger = operatorDriven)
+ stream OutputStream = Beacon()
+ {
+ param
+ period : 0.25 ;
+ initDelay : 4.0 ;
+ triggerCount : 20u ;
+ output
+ OutputStream : topic = "mySimpleTopic", message =(rstring)
+ IterationCount(), key =(rstring)(int32)(random() * 10.0) ;
+ }
+
+ //Write to Kafka Server
+ () as KafkaSinkOp = KafkaProducer(OutputStream)
+ {
+ param
+ propertiesFile : "etc/producer.properties" ;
+ }
+
+ //Read in from a kafka server and start consistent region
+ @consistent(trigger = operatorDriven) stream
+ KafkaConsumerOut = KafkaConsumer()
+ {
+ param
+ propertiesFile : "etc/consumer.properties" ;
+ topic : "mySimpleTopic" ;
+ partition : 0 ;
+ triggerCount : 20 ;
+ }
+
+ //Print out data to a file
+ () as MessagePrinter = FileSink(KafkaConsumerOut)
+ {
+ param
+ file : "messagesReceived.out" ;
+ flush : 1u ;
+ format : csv ;
+ }
+
+ () as JCP = JobControlPlane()
+ {
+ }
+
+ }
diff --git a/samples/KafkaConsistentRegionConsumerSimple/etc/consumer.properties b/samples/KafkaConsistentRegionConsumerSimple/etc/consumer.properties
new file mode 100644
index 0000000..fee372e
--- /dev/null
+++ b/samples/KafkaConsistentRegionConsumerSimple/etc/consumer.properties
@@ -0,0 +1,6 @@
+zookeeper.connect=zk.host.1:2181,zk.host.2:2181,zk.host.3:2181
+serializer.class=kafka.serializer.StringEncoder
+group.id=mygroup
+zookeeper.session.timeout.ms=4000
+zookeeper.sync.time.ms=2000
+auto.commit.interval.ms=1000
\ No newline at end of file
diff --git a/samples/KafkaConsistentRegionConsumerSimple/etc/producer.properties b/samples/KafkaConsistentRegionConsumerSimple/etc/producer.properties
new file mode 100644
index 0000000..ebf7396
--- /dev/null
+++ b/samples/KafkaConsistentRegionConsumerSimple/etc/producer.properties
@@ -0,0 +1,3 @@
+metadata.broker.list=zk.host.1:2181,zk.host.2:2181,zk.host.3:2181
+serializer.class=kafka.serializer.StringEncoder
+request.required.acks=1
\ No newline at end of file
diff --git a/samples/KafkaConsistentRegionConsumerSimple/info.xml b/samples/KafkaConsistentRegionConsumerSimple/info.xml
new file mode 100644
index 0000000..6149e3f
--- /dev/null
+++ b/samples/KafkaConsistentRegionConsumerSimple/info.xml
@@ -0,0 +1,15 @@
+
+
+
+ ConsistentRegionConsumerSimple
+
+ 1.0.0
+ 4.0.1.0
+
+
+
+ com.ibm.streamsx.messaging
+ [2.0.2,3.0.0)
+
+
+
\ No newline at end of file