Skip to content

Commit

Permalink
Merge pull request #317 from cancilla/master
Browse files Browse the repository at this point in the history
Deprecate Kafka operators -- ref #316
  • Loading branch information
schulz2 authored Sep 4, 2017
2 parents d6a737c + b98d285 commit c701cab
Show file tree
Hide file tree
Showing 7 changed files with 29 additions and 8 deletions.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,17 @@
description="The tuples arriving on this port are expected to contain three attributes \\\"key\\\", \\\"topic\\\" and \\\"message\\\". " +
"Out of these \\\"message\\\", is a required attribute."))
@PrimitiveOperator(name=KafkaSink.OPER_NAME, description=KafkaSink.DESC)
@Icons(location16="icons/KafkaProducer_16.gif", location32="icons/KafkaProducer_32.gif")
@Icons(location16="icons/KafkaProducer_deprecated_16.gif", location32="icons/KafkaProducer_deprecated_32.gif")
public class KafkaSink extends KafkaBaseOper {

private static final String DEPRECATION_MESSAGE = "The `com.ibm.streamsx.messaging.kafka.KafkaProducer` operator is "
+ "deprecated and is replaced by the `com.ibm.streamsx.kafka.KafkaProducer` "
+ "operator in the `com.ibm.streamsx.kafka` toolkit. The deprecated operator "
+ "might be removed in a future release.";

static {
System.err.println(KafkaSink.DEPRECATION_MESSAGE);
}

static final String OPER_NAME = "KafkaProducer"; //$NON-NLS-1$

Expand All @@ -56,7 +64,7 @@ public static boolean topicChecker(OperatorContextChecker checker) {
return checker.checkExcludedParameters("topic", "topicAttribute") && //$NON-NLS-1$ //$NON-NLS-2$
checker.checkExcludedParameters("topicAttribute", "topic"); //$NON-NLS-1$ //$NON-NLS-2$
}

//consistent region checks
@ContextCheck(compile = true)
public static void checkInConsistentRegion(OperatorContextChecker checker) {
Expand All @@ -80,6 +88,8 @@ public static void checkIncomingMessageAttribute(OperatorContextChecker checker)
@Override
public void initialize(OperatorContext context) throws Exception
{
trace.log(TraceLevel.ERROR, KafkaSink.DEPRECATION_MESSAGE);

super.initialize(context);
super.initSchema(getInput(0).getStreamSchema());

Expand Down Expand Up @@ -166,7 +176,8 @@ private void resetProducerClient(OperatorContext context) throws FileNotFoundExc
producerClient = getNewProducerClient(topicAH, keyAH, messageAH, finalProperties);
}

public static final String DESC =
public static final String DESC = "**DEPRECATED**: " + KafkaSink.DEPRECATION_MESSAGE + "\\n" + //$NON-NLS-1$
"\\n" + //$NON-NLS-1$
"This operator acts as a Kafka producer sending tuples as messages to a Kafka broker. " + //$NON-NLS-1$
"The broker is assumed to be already configured and running. " + //$NON-NLS-1$
"The incoming stream can have three attributes: topic, key and message. " + //$NON-NLS-1$
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,18 @@
@OutputPorts(@OutputPortSet(cardinality=1, optional=false,
description="Messages received from Kafka are sent on this output port."))
@PrimitiveOperator(name=KafkaSource.OPER_NAME, description=KafkaSource.DESC)
@Icons(location16="icons/KafkaConsumer_16.gif", location32="icons/KafkaConsumer_32.gif")
@Icons(location16="icons/KafkaConsumer_deprecated_16.gif", location32="icons/KafkaConsumer_deprecated_32.gif")
public class KafkaSource extends KafkaBaseOper implements StateHandler{

private static final String DEPRECATION_MESSAGE = "The `com.ibm.streamsx.messaging.kafka.KafkaConsumer` operator is "
+ "deprecated and is replaced by the `com.ibm.streamsx.kafka.KafkaConsumer` "
+ "operator in the `com.ibm.streamsx.kafka` toolkit. The deprecated operator "
+ "might be removed in a future release.";

static {
System.err.println(KafkaSource.DEPRECATION_MESSAGE);
}

static final String OPER_NAME = "KafkaConsumer"; //$NON-NLS-1$
// private int threadsPerTopic = 1;
private List<Integer> partitions = new ArrayList<Integer>();
Expand Down Expand Up @@ -89,8 +97,6 @@ public class KafkaSource extends KafkaBaseOper implements StateHandler{
private Map<Integer,Metric> startingOffsetsMetrics;
private Map<Integer,Metric> ckptOffsetsMetrics;
private Map<Integer,Metric> regionCkptOffsetsMetrics;



//consistent region checks
@ContextCheck(compile = true)
Expand Down Expand Up @@ -135,6 +141,8 @@ public static void checkIncomingMessageAttribute(OperatorContextChecker checker)
@Override
public void initialize(OperatorContext context)
throws Exception {
trace.log(TraceLevel.ERROR, KafkaSource.DEPRECATION_MESSAGE);

super.initialize(context);
super.initSchema(getOutput(0).getStreamSchema());

Expand Down Expand Up @@ -390,7 +398,9 @@ public void setPartition(int[] values) {
}
}

public static final String DESC = "This operator acts as a Kafka consumer receiving messages for one or more topics. " //$NON-NLS-1$
public static final String DESC = "**DEPRECATED**: " + KafkaSource.DEPRECATION_MESSAGE + "\\n" //$NON-NLS-1$
+ "\\n" //$NON-NLS-1$
+ "This operator acts as a Kafka consumer receiving messages for one or more topics. " //$NON-NLS-1$
+ "Ordering of messages is only guaranteed per Kafka topic partition. " + BASE_DESC + // common //$NON-NLS-1$
// description
// between
Expand Down
2 changes: 1 addition & 1 deletion com.ibm.streamsx.messaging/info.xml
Original file line number Diff line number Diff line change
Expand Up @@ -680,7 +680,7 @@ The &lt;attribute> element has three possible attributes:
* composite types
* xml
</info:description>
<info:version>5.3.1</info:version>
<info:version>5.3.2</info:version>
<info:requiredProductVersion>4.2.0.0</info:requiredProductVersion>
</info:identity>
<info:dependencies/>
Expand Down

0 comments on commit c701cab

Please sign in to comment.