Skip to content

Commit

Permalink
Merge pull request #115 from Alex-Cook4/master
Browse files Browse the repository at this point in the history
Consistent Region support for Kafka Consumer
  • Loading branch information
chanskw committed Aug 13, 2015
2 parents 9eda955 + c3e2697 commit 1083750
Show file tree
Hide file tree
Showing 24 changed files with 999 additions and 27 deletions.
2 changes: 1 addition & 1 deletion LICENSE.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
##License
##License

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this except in compliance with the License.
Expand Down
8 changes: 4 additions & 4 deletions com.ibm.streamsx.messaging/.classpath
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,8 @@
<classpathentry kind="lib" path="opt/downloaded/jline-0.9.94.jar"/>
<classpathentry kind="lib" path="opt/downloaded/jopt-simple-3.2.jar"/>
<classpathentry kind="lib" path="opt/downloaded/junit-3.8.1.jar"/>
<classpathentry kind="lib" path="opt/downloaded/kafka_2.8.0-0.8.1.jar"/>
<classpathentry kind="lib" path="opt/downloaded/log4j-1.2.15.jar"/>
<classpathentry kind="lib" path="opt/downloaded/mail-1.4.jar"/>
<classpathentry kind="lib" path="opt/downloaded/metrics-annotation-2.2.0.jar"/>
<classpathentry kind="lib" path="opt/downloaded/metrics-core-2.2.0.jar"/>
<classpathentry kind="lib" path="opt/downloaded/scala-library-2.8.0.jar"/>
<classpathentry kind="lib" path="opt/downloaded/snappy-java-1.0.5.jar"/>
Expand All @@ -18,8 +16,10 @@
<classpathentry kind="lib" path="opt/downloaded/activemq-client-5.11-SNAPSHOT.jar"/>
<classpathentry kind="lib" path="opt/downloaded/geronimo-j2ee-management_1.1_spec-1.0.1.jar"/>
<classpathentry kind="lib" path="opt/downloaded/geronimo-jms_1.1_spec-1.1.1.jar"/>
<classpathentry kind="lib" path="opt/downloaded/slf4j-api-1.7.5.jar"/>
<classpathentry kind="con" path="com.ibm.streams.java/com.ibm.streams.operator"/>
<classpathentry kind="lib" path="opt/downloaded/org.eclipse.paho.client.mqttv3-1.0.1-SNAPSHOT.jar"/>
<classpathentry kind="lib" path="opt/downloaded/hawtbuf-1.11.jar"/>
<classpathentry kind="lib" path="opt/downloaded/kafka_2.8.0-0.8.1.1.jar"/>
<classpathentry kind="lib" path="opt/downloaded/org.eclipse.paho.client.mqttv3-1.0.1.jar"/>
<classpathentry kind="lib" path="opt/downloaded/slf4j-api-1.7.10.jar"/>
<classpathentry kind="output" path="impl/java/bin"/>
</classpath>
2 changes: 1 addition & 1 deletion com.ibm.streamsx.messaging/.gitignore
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/output
com.ibm.streamsx.messaging.kafka
impl/lib
com.ibm.streamsx.messaging.kafka
opt/downloaded
/target
/.externalToolBuilders
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,8 @@ void setValue(OutputTuple otup, byte[] value) {
otup.setString(name, new String(value, CS));
else
otup.setBlob(name, ValueFactory.newBlob(value));
}
}

String getString(Tuple tuple) throws IOException {
if(!isAvailable) return null;
if(isString)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ public abstract class KafkaBaseOper extends AbstractOperator {
messageAH = new AttributeHelper("message");
protected List<String> topics = new ArrayList<String>();
protected KafkaClient client = null;
protected SimpleConsumerClient simpleClient = null;
private final Logger trace = Logger.getLogger(KafkaBaseOper.class
.getCanonicalName());

Expand Down Expand Up @@ -70,8 +71,15 @@ public void initSchema(StreamSchema ss) throws Exception {
supportedTypes.remove(MetaType.BLOB);
topicAH.initialize(ss, false, supportedTypes);

trace.log(TraceLevel.INFO, "Creating client");
client = new KafkaClient(topicAH, keyAH, messageAH, finalProperties);


OperatorContext operContext = getOperatorContext();
if (operContext.getParameterNames().contains("partition") == false){
//we don't need to create this client if using simpleConsumerClient
trace.log(TraceLevel.INFO, "Creating client");
client = new KafkaClient(topicAH, keyAH, messageAH, finalProperties);
}

}

@Parameter(cardinality = -1, optional = true, description = "Specify a Kafka property \\\"key=value\\\" form. "
Expand All @@ -94,8 +102,9 @@ public void setPropertiesFile(String value) {
}

public String getPropertiesFile() {

File file = new File(propertiesFile);
trace.log(TraceLevel.TRACE, "Properties file: " + propertiesFile);
if (propertiesFile == null) return null;
File file = new File(propertiesFile);

// if the properties file is relative, the path is relative to the application directory
if (!file.isAbsolute())
Expand All @@ -105,12 +114,12 @@ public String getPropertiesFile() {
return propertiesFile;
}

@Parameter(optional = true, description = "Name of the attribute for the message. This attribute is required. Default is \\\"message\\\"")
@Parameter(optional = true, description = "Name of the attribute for the message. This attribute is required. Default is \\\"message\\\".")
public void setMessageAttribute(String value) {
messageAH.setName(value);
}

@Parameter(optional = true, description = "Name of the attribute for the key. Default is \\\"key\\\"")
@Parameter(optional = true, description = "Name of the attribute for the key. Default is \\\"key\\\".")
public void setKeyAttribute(String value) {
keyAH.setName(value);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package com.ibm.streamsx.messaging.kafka;


import java.io.IOException;
import java.util.List;
import java.util.logging.Logger;

Expand All @@ -18,46 +19,102 @@
import com.ibm.streams.operator.model.OutputPorts;
import com.ibm.streams.operator.model.Parameter;
import com.ibm.streams.operator.model.PrimitiveOperator;
import com.ibm.streams.operator.state.Checkpoint;
import com.ibm.streams.operator.state.ConsistentRegionContext;
import com.ibm.streams.operator.state.StateHandler;

@OutputPorts(@OutputPortSet(cardinality=1, optional=false,
description="Messages received from Kafka are sent on this output port."))
@PrimitiveOperator(name=KafkaSource.OPER_NAME, description=KafkaSource.DESC)
@Icons(location16="icons/KafkaConsumer_16.gif", location32="icons/KafkaConsumer_32.gif")
public class KafkaSource extends KafkaBaseOper {
public class KafkaSource extends KafkaBaseOper implements StateHandler{

static final String OPER_NAME = "KafkaConsumer";
private int threadsPerTopic = 1;

private int a_partition = -1;
private int triggerCount = -1;
private int leaderConnectionRetries = 3;
private int connectionRetryInterval = 1000;
private static Logger trace = Logger.getLogger(KafkaSource.class.getName());

//consistent region checks
@ContextCheck(compile = true)
public static void checkInConsistentRegion(OperatorContextChecker checker) {
ConsistentRegionContext consistentRegionContext =
checker.getOperatorContext().getOptionalContext(ConsistentRegionContext.class);

OperatorContext operContext = checker.getOperatorContext();

if(consistentRegionContext != null ) {
checker.setInvalidContext( OPER_NAME + " operator cannot be used inside a consistent region.",
new String[] {});
//checker.setInvalidContext( OPER_NAME + " operator cannot be used inside a consistent region.",
// new String[] {});
if (!operContext.getParameterNames().contains("partition")){
checker.setInvalidContext("The partition parameter must be specified in consistent regions.", new String[] {});
}
}
}


//simple consumer client checks
@ContextCheck(runtime = false, compile = true)
public static void checkCompileCompatability(OperatorContextChecker checker) {
OperatorContext operContext = checker.getOperatorContext();

if (!operContext.getParameterNames().contains("propertiesFile")
&& !operContext.getParameterNames().contains("kafkaProperty")){
checker.setInvalidContext("Missing properties: Neither propertiesFile nor kafkaProperty parameters are set. At least one must be set.",new String[] {});

}

}

//simple consumer client checks
@ContextCheck(runtime = true, compile = false)
public static void checkRuntimeCompatability(OperatorContextChecker checker) {
OperatorContext operContext = checker.getOperatorContext();

if (operContext.getParameterNames().contains("partition")){

if (operContext.getParameterValues("topic").size() > 1){
checker.setInvalidContext("Invalid topic parameter: Only one topic can be specified when the partition parameter is set.", new String[] {});
throw new IllegalArgumentException("Invalid topic parameter: Only one topic can be specified when the partition parameter is set.");
}

}

}

@Override
public void initialize(OperatorContext context)
throws Exception {
super.initialize(context);
super.initSchema(getOutput(0).getStreamSchema());



getOutput(0);
if(threadsPerTopic < 1)
throw new IllegalArgumentException("Number of threads per topic cannot be less than one: " + threadsPerTopic);
throw new IllegalArgumentException("Number of threads per topic cannot be less than one: " + threadsPerTopic);
ConsistentRegionContext crContext = getOperatorContext().getOptionalContext(ConsistentRegionContext.class);
if( crContext != null){

if (a_partition == -1){
throw new IllegalArgumentException("Partition parameter must be specified when using consistent region");
}

}
}

@Override
public void allPortsReady() throws Exception {
//initialize the client
trace.log(TraceLevel.INFO, "Initializing client");
client.initConsumer(getOutput(0), getOperatorContext().getThreadFactory(), topics, threadsPerTopic);
trace.log(TraceLevel.INFO, "Initializing client");
if(a_partition >= 0){
trace.log(TraceLevel.INFO, "Using simple consumer client.");
simpleClient = new SimpleConsumerClient(topics.get(0), a_partition, keyAH, messageAH, finalProperties, triggerCount, leaderConnectionRetries, connectionRetryInterval);
simpleClient.initialize(getOperatorContext());
simpleClient.allPortsReady();
} else {
trace.log(TraceLevel.INFO, "Using high level consumer client.");
client.initConsumer(getOutput(0), getOperatorContext().getThreadFactory(), topics, threadsPerTopic);
}
}

@Parameter(name="threadsPerTopic", optional=true,
Expand All @@ -70,15 +127,73 @@ public void setThreadsPerTopic(int value) {
public void setTopic(List<String> values) {
if(values!=null)
topics.addAll(values);
}
}

@Parameter(name="partition", optional=true,
description="Partition to subscribe to. This must be set when using consistent region.")
public void setPartition(int value) {
this.a_partition = value;
}

@Parameter(name="triggerCount", optional=true,
description="Number of messages between checkpointing for consistent region. This is only relevant to operator driven checkpointing.")
public void setTriggerCount(int value) {
this.triggerCount = value;
}

@Parameter(name="leaderConnectionRetries", optional=true,
description="Number of attempts at finding a Kafka Broker before giving up. This is relevant when first looking for a broker, and in the case that a lead broker host goes down. This is only valid when the partition parameter is set. Default is 3.")
public void setLeaderConnectionRetries(int value) {
this.leaderConnectionRetries = value;
}

@Parameter(name="connectionRetryInterval", optional=true,
description="Interval between each attempt to find a lead Kafka Broker in milliseconds. Number of attempts is set by the leaderConnectionRetries parameter. This is only valid when the partition parameter is set. Default is 1000 ms.")
public void setConnectionRetryInterval(int value) {
this.connectionRetryInterval = value;
}

public static final String DESC =
"This operator acts as a Kafka consumer receiving messages for one or more topics. " +
"This operator acts as a Kafka consumer receiving messages for a single topic. " +
"Note that there may be multiple threads receiving messages depending on the configuration specified. " +
"Ordering of messages is not guaranteed." +
"\\n\\n**Behavior in a Consistent Region**" +
"\\nThis operator cannot be used inside a consistent region."
;
"\\nThis operator can be used inside a consistent region. Operator driven and periodical checkpointing " +
"are supported. Partition to be read from must be specified. To consume multiple partitions in a topic, use " +
"user defined parallelism or multiple consumers."
;

@Override
public void close() throws IOException {
// TODO Auto-generated method stub

}

@Override
public void checkpoint(Checkpoint checkpoint) throws Exception {
simpleClient.checkpoint(checkpoint);
}

@Override
public void drain() throws Exception {
simpleClient.drain();

}

@Override
public void reset(Checkpoint checkpoint) throws Exception {
simpleClient.reset(checkpoint);
}

@Override
public void resetToInitialState() throws Exception {
simpleClient.resetToInitialState();
}

@Override
public void retireCheckpoint(long id) throws Exception {
simpleClient.retireCheckpoint(id);
}

}

Loading

0 comments on commit 1083750

Please sign in to comment.