-
Notifications
You must be signed in to change notification settings - Fork 0
Single node code example
David Prat edited this page Sep 6, 2013
·
12 revisions
In this example a prime number generator is used. A Topology with one spout that generates the natural numbers in order and a spout that finds which of them are prime is created and launched in a storm single node installation.
public class PrimeNumberTopology {
public static void main(String[] args) throws Exception{
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout( "spout", new PrimeNumberSpout() );
builder.setBolt( "prime", new PrimeNumberBolt() )
.shuffleGrouping("spout");
//This is the way a configuration is created to be run in a cluster
Config conf = new Config();
conf.setDebug(true);
conf.setNumWorkers(1);
conf.setMaxSpoutPending(5000);
try{
StormSubmitter.submitTopology( args[0], conf, builder.createTopology() );
}catch(AlreadyAliveException e){
}
}
}
-
builder.setSpout("spout", new PrimeNumberSpout())
: This call sets one spout called "spout" -
builder.setBolt("prime", new PrimeNumberBolt()) .shuffleGrouping("spout")
: This call sets a bolt called "prime", extension call .shuffleGrouping method tells the runtime tuples from spout "spout" have to go randomly to bolt's tasks. -
StormSubmitter.submitTopology( args[0], conf, builder.createTopology() );
: This method is in charge to submit the topology to storm Nimbus process with the specified configuration. In this case, the topology name is given through the first input parameter when calling the final executable.
public class PrimeNumberSpout extends BaseRichSpout {
private SpoutOutputCollector collector;
private static int currentNumber = 1;
@Override
public void open( Map conf, TopologyContext context, SpoutOutputCollector collector ) {
this.collector = collector;
}
@Override
public void nextTuple() {
// Emit the next number
collector.emit( new Values( new Integer( currentNumber++ ) ) );
}
@Override
public void ack(Object id) {
}
@Override
public void fail(Object id){
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare( new Fields( "number" ) );
}
}
- Open method is called at the beginning when the spout is created. The collector is the data structure with which the bolt emits the tuples.
- Next tuple is called repeatedly; each time the method checks if there's a tuple to emit, if not the method ends up. In this case each time the spout itself generated a tuple with the last natural number incremented to emit it to the bolt through emit method.
- The other main methods on spouts are ack and fail. These are called when Storm detects that a tuple emitted from the spout either successfully completed through the topology or failed to be completed. ack and fail are only called for reliable spouts.
- declareOutputFields is an important method as it defines the number of tuple parameters as well as their names. Bolt needs to read them according to their structure.
public class PrimeNumberBolt extends BaseRichBolt {
private OutputCollector collector;
public void prepare( Map conf, TopologyContext context, OutputCollector collector ){
this.collector = collector;
}
public void execute( Tuple tuple ) {
int number = tuple.getInteger( 0 );
if( isPrime( number) ){
System.out.println( number );
}
collector.ack( tuple );
}
public void declareOutputFields( OutputFieldsDeclarer declarer ){
declarer.declare( new Fields( "number" ) );
}
private boolean isPrime( int n ) {
if( n == 1 || n == 2 || n == 3 ){
return true;
}
if( n % 2 == 0 ){
return false;
}
for( int i=3; i*i<=n; i+=2 ) {
if( n % i == 0){
return false;
}
}
return true;
}
}
- Here the focus is on the execute method which makes the computations to received tuples. In this case, the number is extracted from the tuple to be processed here. For each tuple processed, ack method has to be called so the spout which emitted that tuple is informed about that tuple has been processes correctly.