Skip to content

Commit

Permalink
Merge pull request #119 from Alex-Cook4/master
Browse files Browse the repository at this point in the history
Adding sample and fixing makefiles
  • Loading branch information
chanskw committed Aug 14, 2015
2 parents 1083750 + b403d59 commit 39bd062
Show file tree
Hide file tree
Showing 17 changed files with 62,718 additions and 15 deletions.
2 changes: 1 addition & 1 deletion samples/KafkaConsistentRegionConsumerParallel/.project
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<projectDescription>
<name>KafkaConsistentRegionConsumerParallel</name>
<name>KafkaConsistentRegionParallelConsumers</name>
<comment></comment>
<projects>
</projects>
Expand Down
6 changes: 3 additions & 3 deletions samples/KafkaConsistentRegionConsumerParallel/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

TOOLKIT_NAME=com.ibm.streamsx.messaging
STREAMS_MESSAGING_TOOLKIT ?= $(shell ([ -e ../../$(TOOLKIT_NAME)/toolkit.xml ] && echo ../../$(TOOLKIT_NAME)) ||\
echo $(STREAMS_STUDIO_SPL_PATH) ||\
([ -e "../$(TOOLKIT_NAME)" ] && echo ../$(TOOLKIT_NAME)) ||\
echo $(STREAMS_INSTALL)/toolkits/$(TOOLKIT_NAME))

Expand All @@ -13,7 +14,7 @@ SPLC_FLAGS ?= -a --data-directory data
SPLC = $(STREAMS_INSTALL)/bin/sc

SPL_CMD_ARGS ?= -t $(STREAMS_MESSAGING_TOOLKIT)
SPL_MAIN_COMPOSITE = application::ConsistentRegionConsumerParallel
SPL_MAIN_COMPOSITE = application::ConsistentRegionParallelConsumers

all: distributed

Expand All @@ -27,5 +28,4 @@ distributed: data
$(SPLC) $(SPLC_FLAGS) -M $(SPL_MAIN_COMPOSITE) $(SPL_CMD_ARGS)

clean:
$(SPLC) $(SPLC_FLAGS) -C -M $(SPL_MAIN_COMPOSITE)
rm data/*.out
$(SPLC) $(SPLC_FLAGS) -C -M $(SPL_MAIN_COMPOSITE)
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,10 @@ use com.ibm.streamsx.messaging.kafka::* ;
* Consistent Region does not support Standalone mode, so this sample is only interesting in
* Distributed mode.
*
*
* TO REMOVE CONSISTENT REGION: remove @consistent annotations (there are two) and
* delete JobControlPlane operator.
*/
composite ConsistentRegionConsumerParallel
composite ConsistentRegionParallelConsumers
{
graph
//generate data to be written to a kafka server
Expand All @@ -49,7 +50,7 @@ composite ConsistentRegionConsumerParallel
propertiesFile : "etc/producer.properties" ;
}

//Read in from a kafka server and start consistent region
//Read in from a kafka server with a 3-partition topic and start consistent region
@parallel(width = 3) @consistent(trigger = periodic, period = 5.0)
stream<rstring message, rstring key> KafkaConsumerOut = KafkaConsumer()
{
Expand Down
2 changes: 1 addition & 1 deletion samples/KafkaConsistentRegionConsumerParallel/info.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<info:name>ConsistentRegionConsumerParallel</info:name>
<info:description></info:description>
<info:version>1.0.0</info:version>
<info:requiredProductVersion>4.0.1.0</info:requiredProductVersion>
<info:requiredProductVersion>4.0.0.0</info:requiredProductVersion>
</info:identity>
<info:dependencies>
<info:toolkit>
Expand Down
2 changes: 1 addition & 1 deletion samples/KafkaConsistentRegionConsumerSimple/.project
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<projectDescription>
<name>KafkaConsistentRegionConsumerSimple</name>
<name>KafkaConsistentRegionSimple</name>
<comment></comment>
<projects>
</projects>
Expand Down
8 changes: 4 additions & 4 deletions samples/KafkaConsistentRegionConsumerSimple/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

TOOLKIT_NAME=com.ibm.streamsx.messaging
STREAMS_MESSAGING_TOOLKIT ?= $(shell ([ -e ../../$(TOOLKIT_NAME)/toolkit.xml ] && echo ../../$(TOOLKIT_NAME)) ||\
echo $(STREAMS_STUDIO_SPL_PATH) ||\
([ -e "../$(TOOLKIT_NAME)" ] && echo ../$(TOOLKIT_NAME)) ||\
echo $(STREAMS_INSTALL)/toolkits/$(TOOLKIT_NAME))

Expand All @@ -13,10 +14,10 @@ SPLC_FLAGS ?= -a --data-directory data
SPLC = $(STREAMS_INSTALL)/bin/sc

SPL_CMD_ARGS ?= -t $(STREAMS_MESSAGING_TOOLKIT)
SPL_MAIN_COMPOSITE = application::ConsistentRegionConsumerSimple
SPL_MAIN_COMPOSITE = application::ConsistentRegionSimple

all: distributed

data:
mkdir data

Expand All @@ -27,5 +28,4 @@ distributed: data
$(SPLC) $(SPLC_FLAGS) -M $(SPL_MAIN_COMPOSITE) $(SPL_CMD_ARGS)

clean:
$(SPLC) $(SPLC_FLAGS) -C -M $(SPL_MAIN_COMPOSITE)
rm data/*.out
$(SPLC) $(SPLC_FLAGS) -C -M $(SPL_MAIN_COMPOSITE)
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use com.ibm.streamsx.messaging.kafka::* ;
*
*
*/
composite ConsistentRegionConsumerSimple
composite ConsistentRegionSimple
{
graph
//generate data to be written to a kafka server
Expand Down
2 changes: 1 addition & 1 deletion samples/KafkaConsistentRegionConsumerSimple/info.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<info:name>ConsistentRegionConsumerSimple</info:name>
<info:description></info:description>
<info:version>1.0.0</info:version>
<info:requiredProductVersion>4.0.1.0</info:requiredProductVersion>
<info:requiredProductVersion>4.0.0.0</info:requiredProductVersion>
</info:identity>
<info:dependencies>
<info:toolkit>
Expand Down
11 changes: 11 additions & 0 deletions samples/KafkaParallelConsumers/.classpath
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
<?xml version="1.0" encoding="UTF-8"?>
<classpath>
<classpathentry exported="true" kind="con" path="com.ibm.streams.java/com.ibm.streams.operator"/>
<classpathentry exported="true" kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER"/>
<classpathentry kind="src" path=".apt_generated">
<attributes>
<attribute name="optional" value="true"/>
</attributes>
</classpathentry>
<classpathentry kind="output" path="output"/>
</classpath>
29 changes: 29 additions & 0 deletions samples/KafkaParallelConsumers/.project
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
<?xml version="1.0" encoding="UTF-8"?>
<projectDescription>
<name>KafkaParallelConsumers</name>
<comment></comment>
<projects>
</projects>
<buildSpec>
<buildCommand>
<name>org.eclipse.jdt.core.javabuilder</name>
<arguments>
</arguments>
</buildCommand>
<buildCommand>
<name>com.ibm.streams.studio.splproject.builder.SPLProjectBuilder</name>
<arguments>
</arguments>
</buildCommand>
<buildCommand>
<name>org.eclipse.xtext.ui.shared.xtextBuilder</name>
<arguments>
</arguments>
</buildCommand>
</buildSpec>
<natures>
<nature>com.ibm.streams.studio.splproject.SPLProjectNature</nature>
<nature>org.eclipse.xtext.ui.shared.xtextNature</nature>
<nature>org.eclipse.jdt.core.javanature</nature>
</natures>
</projectDescription>
31 changes: 31 additions & 0 deletions samples/KafkaParallelConsumers/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# Copyright (C) 2015, International Business Machines Corporation
# All Rights Reserved

.PHONY: all clean

TOOLKIT_NAME=com.ibm.streamsx.messaging
STREAMS_MESSAGING_TOOLKIT ?= $(shell ([ -e ../../$(TOOLKIT_NAME)/toolkit.xml ] && echo ../../$(TOOLKIT_NAME)) ||\
echo $(STREAMS_STUDIO_SPL_PATH) ||\
([ -e "../$(TOOLKIT_NAME)" ] && echo ../$(TOOLKIT_NAME)) ||\
echo $(STREAMS_INSTALL)/toolkits/$(TOOLKIT_NAME))


SPLC_FLAGS ?= -a --data-directory data
SPLC = $(STREAMS_INSTALL)/bin/sc

SPL_CMD_ARGS ?= -t $(STREAMS_MESSAGING_TOOLKIT)
SPL_MAIN_COMPOSITE = application::ParallelConsumers

all: distributed

data:
mkdir data

standalone: data
$(SPLC) $(SPLC_FLAGS) -T -M $(SPL_MAIN_COMPOSITE) $(SPL_CMD_ARGS)

distributed: data
$(SPLC) $(SPLC_FLAGS) -M $(SPL_MAIN_COMPOSITE) $(SPL_CMD_ARGS)

clean:
$(SPLC) $(SPLC_FLAGS) -C -M $(SPL_MAIN_COMPOSITE)
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
namespace application ;

use com.ibm.streamsx.messaging.kafka::* ;
/**
* Read from a three-partition Kafka topic using the KafkaConsumer operator in a parallel region
* 3-wide. Kafka only guarantees ordering of tuples within a single partition. This application
* provides the same guarantee, but since we are reading from three separate partitions,
* we can lose overall topic order. Depending on the key/message, order can be recovered after consuming
* from a Kafka Server similar to how it is done here: https://developer.ibm.com/streamsdev/docs/parallelized-file-processing-parse-operator/
*
*
* Make sure you have created your topic before launching:
* bin/kafka-topics.sh --create --zookeeper <zk.Host.1>:2181 --partitions 3 --topic myParallelTopic
*
* Edit the consumer.properties and producer.properties files found in the etc directory to include
* your Kafka properties.
*
* Build using Studio or the provided Makefile.
*
* Check results by looking at messagesReceived.out in the data directory.
*
*/
composite ParallelConsumers
{
graph
//generate data to be written to a kafka server
stream<rstring topic, rstring key, rstring message> OutputStream = Beacon()
{
param
period : 0.25 ;
initDelay : 4.0 ;
output
OutputStream : topic = "myParallelTopic", message =(rstring)
IterationCount(), key =(rstring)(int32)(random() * 10.0) ;
}

//Write to Kafka Server
() as KafkaSinkOp = KafkaProducer(OutputStream)
{
param
propertiesFile : "etc/producer.properties" ;
}

//Read in from a kafka server with a 3-partition topic and start consistent region
@parallel(width = 3)
stream<rstring message, rstring key> KafkaConsumerOut = KafkaConsumer()
{
param
propertiesFile : "etc/consumer.properties" ;
topic : "myParallelTopic" ;
partition : getChannel() ;
}


//Print out data to a file
() as MessagePrinter = FileSink(KafkaConsumerOut)
{
param
file : "messagesReceived.out" ;
flush : 1u ;
format : csv ;
}

() as JCP = JobControlPlane()
{
}

}

Loading

0 comments on commit 39bd062

Please sign in to comment.