diff --git a/com.ibm.streamsx.messaging/icons/KafkaConsumer_deprecated_16.gif b/com.ibm.streamsx.messaging/icons/KafkaConsumer_deprecated_16.gif new file mode 100644 index 0000000..fb16806 Binary files /dev/null and b/com.ibm.streamsx.messaging/icons/KafkaConsumer_deprecated_16.gif differ diff --git a/com.ibm.streamsx.messaging/icons/KafkaConsumer_deprecated_32.gif b/com.ibm.streamsx.messaging/icons/KafkaConsumer_deprecated_32.gif new file mode 100644 index 0000000..606728d Binary files /dev/null and b/com.ibm.streamsx.messaging/icons/KafkaConsumer_deprecated_32.gif differ diff --git a/com.ibm.streamsx.messaging/icons/KafkaProducer_deprecated_16.gif b/com.ibm.streamsx.messaging/icons/KafkaProducer_deprecated_16.gif new file mode 100644 index 0000000..ea0e6a5 Binary files /dev/null and b/com.ibm.streamsx.messaging/icons/KafkaProducer_deprecated_16.gif differ diff --git a/com.ibm.streamsx.messaging/icons/KafkaProducer_deprecated_32.gif b/com.ibm.streamsx.messaging/icons/KafkaProducer_deprecated_32.gif new file mode 100644 index 0000000..0d6254f Binary files /dev/null and b/com.ibm.streamsx.messaging/icons/KafkaProducer_deprecated_32.gif differ diff --git a/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/kafka/KafkaSink.java b/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/kafka/KafkaSink.java index cf1a994..9c94099 100755 --- a/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/kafka/KafkaSink.java +++ b/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/kafka/KafkaSink.java @@ -31,9 +31,17 @@ description="The tuples arriving on this port are expected to contain three attributes \\\"key\\\", \\\"topic\\\" and \\\"message\\\". " + "Out of these \\\"message\\\", is a required attribute.")) @PrimitiveOperator(name=KafkaSink.OPER_NAME, description=KafkaSink.DESC) -@Icons(location16="icons/KafkaProducer_16.gif", location32="icons/KafkaProducer_32.gif") +@Icons(location16="icons/KafkaProducer_deprecated_16.gif", location32="icons/KafkaProducer_deprecated_32.gif") public class KafkaSink extends KafkaBaseOper { + + private static final String DEPRECATION_MESSAGE = "The `com.ibm.streamsx.messaging.kafka.KafkaProducer` operator is " + + "deprecated and is replaced by the `com.ibm.streamsx.kafka.KafkaProducer` " + + "operator in the `com.ibm.streamsx.kafka` toolkit. The deprecated operator " + + "might be removed in a future release."; + static { + System.err.println(KafkaSink.DEPRECATION_MESSAGE); + } static final String OPER_NAME = "KafkaProducer"; //$NON-NLS-1$ @@ -56,7 +64,7 @@ public static boolean topicChecker(OperatorContextChecker checker) { return checker.checkExcludedParameters("topic", "topicAttribute") && //$NON-NLS-1$ //$NON-NLS-2$ checker.checkExcludedParameters("topicAttribute", "topic"); //$NON-NLS-1$ //$NON-NLS-2$ } - + //consistent region checks @ContextCheck(compile = true) public static void checkInConsistentRegion(OperatorContextChecker checker) { @@ -80,6 +88,8 @@ public static void checkIncomingMessageAttribute(OperatorContextChecker checker) @Override public void initialize(OperatorContext context) throws Exception { + trace.log(TraceLevel.ERROR, KafkaSink.DEPRECATION_MESSAGE); + super.initialize(context); super.initSchema(getInput(0).getStreamSchema()); @@ -166,7 +176,8 @@ private void resetProducerClient(OperatorContext context) throws FileNotFoundExc producerClient = getNewProducerClient(topicAH, keyAH, messageAH, finalProperties); } - public static final String DESC = + public static final String DESC = "**DEPRECATED**: " + KafkaSink.DEPRECATION_MESSAGE + "\\n" + //$NON-NLS-1$ + "\\n" + //$NON-NLS-1$ "This operator acts as a Kafka producer sending tuples as messages to a Kafka broker. " + //$NON-NLS-1$ "The broker is assumed to be already configured and running. " + //$NON-NLS-1$ "The incoming stream can have three attributes: topic, key and message. " + //$NON-NLS-1$ diff --git a/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/kafka/KafkaSource.java b/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/kafka/KafkaSource.java index e51693c..273f367 100755 --- a/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/kafka/KafkaSource.java +++ b/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/kafka/KafkaSource.java @@ -51,10 +51,18 @@ @OutputPorts(@OutputPortSet(cardinality=1, optional=false, description="Messages received from Kafka are sent on this output port.")) @PrimitiveOperator(name=KafkaSource.OPER_NAME, description=KafkaSource.DESC) -@Icons(location16="icons/KafkaConsumer_16.gif", location32="icons/KafkaConsumer_32.gif") +@Icons(location16="icons/KafkaConsumer_deprecated_16.gif", location32="icons/KafkaConsumer_deprecated_32.gif") public class KafkaSource extends KafkaBaseOper implements StateHandler{ + private static final String DEPRECATION_MESSAGE = "The `com.ibm.streamsx.messaging.kafka.KafkaConsumer` operator is " + + "deprecated and is replaced by the `com.ibm.streamsx.kafka.KafkaConsumer` " + + "operator in the `com.ibm.streamsx.kafka` toolkit. The deprecated operator " + + "might be removed in a future release."; + static { + System.err.println(KafkaSource.DEPRECATION_MESSAGE); + } + static final String OPER_NAME = "KafkaConsumer"; //$NON-NLS-1$ // private int threadsPerTopic = 1; private List partitions = new ArrayList(); @@ -89,8 +97,6 @@ public class KafkaSource extends KafkaBaseOper implements StateHandler{ private Map startingOffsetsMetrics; private Map ckptOffsetsMetrics; private Map regionCkptOffsetsMetrics; - - //consistent region checks @ContextCheck(compile = true) @@ -135,6 +141,8 @@ public static void checkIncomingMessageAttribute(OperatorContextChecker checker) @Override public void initialize(OperatorContext context) throws Exception { + trace.log(TraceLevel.ERROR, KafkaSource.DEPRECATION_MESSAGE); + super.initialize(context); super.initSchema(getOutput(0).getStreamSchema()); @@ -390,7 +398,9 @@ public void setPartition(int[] values) { } } - public static final String DESC = "This operator acts as a Kafka consumer receiving messages for one or more topics. " //$NON-NLS-1$ + public static final String DESC = "**DEPRECATED**: " + KafkaSource.DEPRECATION_MESSAGE + "\\n" //$NON-NLS-1$ + + "\\n" //$NON-NLS-1$ + + "This operator acts as a Kafka consumer receiving messages for one or more topics. " //$NON-NLS-1$ + "Ordering of messages is only guaranteed per Kafka topic partition. " + BASE_DESC + // common //$NON-NLS-1$ // description // between diff --git a/com.ibm.streamsx.messaging/info.xml b/com.ibm.streamsx.messaging/info.xml index 52a102b..bd2dae7 100644 --- a/com.ibm.streamsx.messaging/info.xml +++ b/com.ibm.streamsx.messaging/info.xml @@ -680,7 +680,7 @@ The <attribute> element has three possible attributes: * composite types * xml - 5.3.1 + 5.3.2 4.2.0.0