Skip to content

Examples

John Schavemaker edited this page Jan 24, 2015 · 27 revisions

Have you read the Getting Started page yet? If not please do before reading this page since it will describe the steps to set-up the examples on your machine and run Example 1!

Most of the examples require the right OpenCV library to be loaded which can be specified in the StormCVConfig like:

// the opencv library to be used is defined globally in Storms configuration
conf.put(StormCVConfig.STORMCV_OPENCV_LIB, "mac64_opencv_java248.dylib");

The following libraries can be used:

  • win32_opencv_java248.dll
  • win64_opencv_java248.dll
  • mac64_opencv_java248.dylib
  • linux64_opencv_java248.so (works on Ubuntu 12.04 and 14.04 LTS)

If the STORMCV_OPENCV_LIB has not been specified the platform will attempt to load the right library given the platform it is being executed on. It is advised to specify the right library explicitly whenever possible to avoid linking exceptions.

Please note that these examples have been designed to run on localhost and thus require a small number of spouts/bolts. The example topologies can be enlarged by adding more data to process and increasing the parallelism hints of spouts and bolts. This can be done on localhost as well but will only increase the number of threads but will not give any performance boost.

Example 1 - scaling and grayscale

Is covered in the Getting Started page.

Example 2 - face detection

The topology in this example simply reads a directory containing a couple of images, performs face detection, extracts faces from the frames, draws the faces in the image and stores them in a local directory (see figure below).

image alt text

The following snippet shows the most important lines of code. Please see example 1 or the code in the examples package how to run it locally. It is also possible to read a directory with images from a remote location, for example Amazon S3 bucket by providing the url such as s3://bucket/path/to/dir. This topology writes images to /output/facedetections so check out this directory to see the progress of the topology. Since topologies run forever, even when there is no data, there is no indication that all data has been processed!

Please see FacedetectionTopology.java within the examples for the full code.

//  list with files to process
List<String> files = new ArrayList<String>(); 
files.add( "file://path/to/directory" );
	
TopologyBuilder builder = new TopologyBuilder();
	
// add 1 spout with ImageFetcher reading the images from the provided paths
builder.setSpout("spout", new CVParticleSpout(new ImageFetcher(files).sleep(100)), 1 ); // sleep is optional 
	
// add 1 bolt with cascade classifier initialized with face model. This operation will output the full frame with face features inside
builder.setBolt("face_detect",new SingleInputBolt( 
	new HaarCascadeOperation("face","lbpcascade_frontalface.xml").outputFrame(true)
), 1).shuffleGrouping("spout");
	
// extracts detected faces from the frames (based on the bounding box)
builder.setBolt("face_extract", new SingleInputBolt(
	new ROIExtractionOperation("face").spacing(25)
), 1).shuffleGrouping("face_detect");
	
// add 1 drawer that simply draws features into a frame and stores them locally (used for testing)
builder.setBolt("drawer", new SingleInputBolt(
	new DrawFeaturesOperation().destination("file://path/to/output/directory")
), 1).shuffleGrouping("face_extract");

The performance of this topology can be improved by increasing the number of bolts of the slowest step in the topology, in this example the FaceDetector. The number can easily be increased by changing the ‘parallelism hint’ for this bolt as shown in the line below (changed from 1 to 8). Each frame from the stream reader is randomly send to one of the two FaceDetector bolts present.

builder.setBolt("face_detect_bolt", new SingleInputBolt(
	new HaarCascadeOperation("face", "lbpcascade_frontalface.xml").outputFrame(true)
), 8).shuffleGrouping("spout"); // <-- the '8' in this line indicates the number of bolt instances

Example 3 - extracting multiple features

The second example reads video, performs two types of feature extraction in parallel and combines the result afterwards. Combining simply means putting them in the same Frame object before sending the total result to the next bolt. This topology would typically end with a bolt that stores this combined result but this is not important for this example. Similar to example 1 the output can be viewed at http://localhost:8558/streaming/tiles and 'tiles' can be clicked to see the video.

image alt text

Note that the Feature Combiner is a BatchOperation that requires 2 Feature objects with the same streamId and sequenceNr (i.e. features belonging to the same frame). The combiner must wait until it received all required features. Thus this topology runs as slow as the slowest feature extractor which would typically be the SIFT operation. For an optimal operation of the topology it is important to configure a good parallelism hint for each bolt. The right number of bolts to use can be estimated locally but is likely to change somewhat when deployed on the cluster (due to different hardware etc.). In this example it makes sense to configure 4 times more SIFT bolts than face detection bolts. The code snippet below only covers the topology from Scale to Combiner, please see MultipleFeaturesTopology.java within the examples for the full code.

// add bolt that scales frames down to 25% of the original size 
builder.setBolt("scale", new SingleInputBolt( new ScaleImageOperation(0.25f)), 1)
	.shuffleGrouping("spout");

// one bolt with a HaarCascade classifier detecting faces. This operation outputs a Frame including the Features with detected faces.
// lbpcascade_frontalface.xml must be present on the classpath!
builder.setBolt("face", new SingleInputBolt( 
	new HaarCascadeOperation("face", "lbpcascade_frontalface.xml").oututFrame(true)),1)
.shuffleGrouping("scale");
	
// add a bolt that performs SIFT keypoint extraction which emits Feature objects instead of Frame
builder.setBolt("sift", new SingleInputBolt( 
	new FeatureExtractionOperation("sift", FeatureDetector.SIFT, DescriptorExtractor.SIFT).oututFrame(false)), 2)
.shuffleGrouping("scale");
		
// Batch bolt that waits for input from both the face and sift detection bolts and combines them in a single frame object
builder.setBolt("combiner", new BatchInputBolt(
	new SequenceNrBatcher(2), 
	new FeatureCombinerOperation()), 1)
.fieldsGrouping("sift", new Fields(FrameSerializer.STREAMID)) //register on sift
.fieldsGrouping("face", new Fields(FrameSerializer.STREAMID)); // register on face

Example 4 - Sequential operation

An alternative topology that produces the exact same results is shown below. Within this topology the two feature extraction operations are executed subsequently using a SequentialFrameOperation. This minimizes the number of times a frame has to be emitted and avoids the dimensionality problem described before. However, the total execution time for a frame (from fetcher to last operation) takes longer. It really depends on the use case which alternative is best. By setting the parallelism hint for the fat bolt to 3 this topology has the exact same number of resources than the one in example 3. Please see FatFeaturesTopology.java within the examples for the full code. The output of this topology can be viewed at http://localhost:8558/streaming/tiles and 'tiles' can be clicked to see the video.

image alt text

// specify the list with SingleInputOperations to be executed sequentially by the 'fat' bolt
operations = new ArrayList<SingleInputOperation<CVParticle>>();
operations.add(new HaarCascadeOperation("face","lbpcascade_frontalface.xml") );
operations.add(new FeatureExtractionOperation("sift", FeatureDetector.SIFT, DescriptorExtractor.SIFT));
	
// add bolt that scales frames down to 25% of the original size 
builder.setBolt("scale", new SingleInputBolt( new ScaleImageOperation(0.25f)), 1)
.shuffleGrouping("spout");

// A 'FAT' Bolt containing a SequentialFrameOperation will will emit a Frame object containing the detected Face and SIFT features
builder.setBolt("fat_features", new SingleInputBolt( 
	new SequentialFrameOperation(operations).outputFrame(true).retainImage(true) ), 3) // three instances of this bolt in the topology!
.shuffleGrouping("scale");

Example 5 - Tiling

StormCV contains a TilingOperation which can split a single frame into a number of possibly overlapping ‘tiles’. A high resolution frame can for example be split into 4 tiles (2x2) ,each of them can be analyzed in parallel which can increase the overall processing speed. Effectively each tile (for example the top-left one) will get their own streamId which can be used to route tiles through the same bolts. The topology below shows a very simple tiling topology where each frames is split into 4 tiles. The top-right and bottom-left tiles are routed to a grayscale operation while the other two are not. The TileMerger stitches the tiles back together and also combines features the tiles might have (not the case in this example). This results in a stream where half of each frame is grayscale and the other is full color. This example can be executed without OpenCV. The routing of tiles is done by the DummyTileGrouping present in the examples package and specially created for this example.

image alt text

Output of this TilingTopology is a stream looking like this.

image alt text

Please see TilingTopology.java within the examples for the full code and implementation of DummyTileGrouping.

// splits each frame into 4 tiles (2x2) with 0 pixels overlap
builder.setBolt("tiler", new SingleInputBolt(new TilingOperation(2, 2).overlap(0)), 1)
.shuffleGrouping("spout");
	
// execute grayscale operation only on tiles 1 and 2 (top right and bottom left)
builder.setBolt("grayscale", new SingleInputBolt(new GrayscaleOperation()), 1)
.customGrouping("tiler", new DummyTileGrouping(new String[]{"1", "2"}));
	
// The tile merger stitches the tiles back together and more importantly also merges features created for each tile. 
// The result of the merger is almost the same as if the tiling did not take place 
builder.setBolt("tile_merger", new BatchInputBolt(
	new SequenceNrBatcher(4),
	new TilesRecombinerOperation().oututFrame(true)).groupBy(new Fields(FrameSerializer.SEQUENCENR)
), 1).shuffleGrouping("grayscale") // only gets tiles 1 and 2 from grayscale
.customGrouping("tiler", new DummyTileGrouping(new String[]{"0", "3"}));

Example 6 - GroupOfFrames

This example shows the use of the FrameGroupingOperation which collects a configurable number of frames and puts them in a GroupOfFrames object before sending it to the next bolt. This enables the topology to use BatchOperations without the use of a Batcher which in turn makes it possible to put these BatchOperations in a more flexible way (for example use shuffleGrouping).The example topology below shows a pipeline used to: scale down frames, group two subsequent frames, perform Dense Optical Flow on them, convert the optical flow to grayscale and stream the result. The key element here is the grouping of frames which is very cheap and can be done by only one bolt followed by a potentially large number of OpticalFlow operations put in shuffleGrouping.

image alt text

The code below only shows the toplogy from the GrameGrouper up to the Drawer. Please see the BigFlowTopology.java within the examples for the full code and implementation of the OpticalFlowVisualizeOperation.

// groups two subsequent frames together in a single GroupOfFrames object
builder.setBolt("grouper", new BatchInputBolt(
	new DiscreteWindowBatcher(2, 1), 
	new FrameGrouperOperation()
), 1).fieldsGrouping("scale",new Fields(FrameSerializer.STREAMID));
	
// Two group of frames operation calculating Optical Flow on Groups it receives. 
// This makes the optical flow stateless! 
builder.setBolt("optical_flow", new SingleInputBolt(
	new GroupOfFramesOperation(new OpticalFlowOperation("optical flow"))
), 2).shuffleGrouping("grouper");
	
// Simple operation that visualizes the optical flow and puts it into a Frame object that can be shown / streamed
builder.setBolt("flow_viz", new SingleInputBolt(
	new OpticalFlowVisualizeOperation("optical flow")), 1 )
.shuffleGrouping("optical_flow");

Example 7 - FetchAndOperate

StormCV contains a special type of Fetcher that enables the use of an Operation directly within the spout. This method minimizes data transfer but can only be used if the operation can keep up with the fetcher. A typical use case is to scale frames or convert them to grayscale before emitting it to the next bolt. This example shows a rather ‘extreme’ example with a Fetcher and a SequentialFeaturesOperation which makes it possible to perform an unlimited number of operations within the spout. The topology is shown below.

image alt text

// List with operations to be executed on the output of the Fetcher
List<SingleInputOperation> operations = new ArrayList<SingleInputOperation>();
operations.add(new FeatureExtractionOperation("sift", FeatureDetector.SIFT, DescriptorExtractor.SIFT).outputFrame(true));
operations.add(new DrawFeaturesOperation() );
	
int frameSkip = 13; 
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout", new CVParticleSpout( 
	new FetchAndOperateFetcher( // meta fetcher to combine a Fetcher and Operation
		new StreamFrameFetcher(urls).frameSkip(frameSkip), // normal fetcher
		new equentialFrameOperation(operations).outputFrame(true).retainImage(true) // sequential operation executed on fetcher output
	)) , 2 );

Example 8 - Background Subtraction

This example shows the use of the BackgroundSubtractionOp operation class. The background operation uses the OpenCV MOG background subtraction class (BackgroundSubtractorMOG) to compute background using a Multiple-Of-Gaussians background model. Please see also the OpenCV tutorial How to Use Background Subtraction Methods

image alt text

// now create the topology itself (spout -> background subtraction --> streamer)
		TopologyBuilder builder = new TopologyBuilder();

		// just one spout reading streams; i.e. this spout reads two streams in parallel
		builder.setSpout("spout", new CVParticleSpout( new StreamFrameFetcher(urls).frameSkip(frameSkip) ), 1 );

		// add bolt that does background subtraction (choose MOG or MOG2 as algorithm)
		builder.setBolt("backgroundsubtraction", new SingleInputBolt( new BackgroundSubtractionOp().setAlgorithm(BSAlgorithm.MOG)), 1)
		.shuffleGrouping("spout");

Running examples on a cluster

Only a few lines of code need to be changed in order to prepare the topologies described above to be executed on a real cluster instead of on local machine.

Change:

// simulate the topology locally
LocalCluster cluster = new LocalCluster();
cluster.submitTopology( "topology_name", conf, builder.createTopology() );
Utils.sleep(120*1000); 
cluster.shutdown();

Simply into:

// deploy the topology on the nimbus node specified in the Storm configuration (storm.yaml file)
StormSubmitter.submitTopology("topology_name", conf, builder.createTopology());

Before a topology can be deployed on a cluster it needs to be packaged in a single jar file excluding the storm library and it’s dependencies. This packaging can be done using Maven (makes live easier… a lot…) but it is not required. The jar file can also contain additional resources such as model files and C libraries (for example OpenCV). See the deployment guide how this package can be created and deployed on a cluster.

Clone this wiki locally