Skip to content

Examples

sensorstorm edited this page Dec 5, 2014 · 27 revisions

Building computer vision topologies

StormCV differs from Storm because the primary building blocks are Fetchers and Operations rather than Spouts / Bolts. StormCV topologies are created by defining and linking a number of Spouts / Bolts (as with normal Storm topologies) and specifying which Fetchers / Operations they should execute. The basics are explained using a number of examples which include the most important code. The full version of these example can also be found in the stormcv examples package. Please note that these examples have been designed to run on localhost and thus require a small number of spouts/bolts. The example topologies can be enlarged by adding more data to process and increasing the parallelism hints of spouts and bolts. This can be done on localhost as well but will only increase the number of threads but will not give any performance boost.

Most of the examples require the right OpenCV library to be loaded which can be specified in the StormCVConfig like:

conf.put(StormCVConfig.STORMCV_OPENCV_LIB, "mac64_opencv_java248.dylib");

If the STORMCV_OPENCV_LIB has not been specified the platform will attempt to load the right library which might not work (depending on platform).

Example 1 - scaling and grayscale

The first topology does not require OpenCV and hence can be executed *without an OpenCV library *on the classpath. This topology reads publicly available streams from street cameras, scales them, converts them to grayscale and streams the result using a web service as a MJPEG stream. The actual topology looks like this:

image alt text

The Streamer creates a web service on port 8558 (by default) which enables users to see the output of the topology which is typically useful for testing purposes. Note that the StreamerOperation is a BatchOperation; it waits until multiple subsequent frames are present before sending the first frame to any clients.

The following code is required to build the topology and run it locally. Go to http://localhost:8558/streaming/tiles to see the result of the running topology (available once it is fully started), clicking an image will open the stream. You can alter to frameSkip parameter to extract and analyse more or less frames. Please see GrayscaledTopology.java within the examples for the full code.

List<String> urls = new ArrayList<String>(); // list with streams to be read
urls.add( "rtsp://streaming3.webcam.nl:1935/n224/n224.stream" );
urls.add("rtsp://streaming3.webcam.nl:1935/n233/n233.stream");

int frameSkip = 13;
TopologyBuilder builder = new TopologyBuilder();

// add a spout with StreamFrameFetcher which will read the streams
builder.setSpout("spout", 
	new CVParticleSpout( new StreamFrameFetcher(urls).frameSki(frameSkip) ) , 1 );

// add a bolt with grayscale operation to turn frames to gray (listens to spout)
builder.setBolt("grayscale", new SingleInputBolt( new GrayscaleOperation()),1)
	.shuffleGrouping("spout");

// add a bolt with scale operation to scale down frames to 66% of the original size. This bolt gets input from grayscale
builder.setBolt("scale", new SingleInputBolt( new ScaleImageOperation(0.66f)), 1)
	.shuffleGrouping("grayscale");

// add batch bolt which will wait for 2 subsequent frames from scale bolt before streaming them
builder.setBolt("streamer", new BatchInputBolt( 
		new SlidingWindowBatcher(2, frameSkip).maxSize(6), 
		new MjpegStreamingOperation().port(8558).framerate(5)).groupBy(new Fields(FrameSerializer.STREAMID)
	), 1).shuffleGrouping("scale");

// the code below executes this topology on localhost		
try {
	LocalCluster cluster = new LocalCluster();
	cluster.submitTopology( "grayscaled", conf, builder.createTopology() );
	Utils.sleep(120*1000);
	cluster.shutdown();
} catch (Exception e){
	e.printStackTrace();
}

Note that this topology sends each frame three times which might consume a lot of bandwidth. Within a normal scenario it is logical to compute features early in the topology and drop the frames.

Example 2 - face detection

The topology in this example simply reads a directory containing a couple of images, performs face detection, extracts faces from the frames, draws the faces in the image and stores them in a local directory (see figure below).

image alt text

The following snippet shows the most important lines of code. Please see example 1 or the code in the examples package how to run it locally. It is also possible to read a directory with images from a remote location, for example ftp by providing the url such as ftp://path/to/ftp/dir.

Please see FacedetectionTopology.java within the examples for the full code.

List<String> files = new ArrayList<String>(); // list with files to process
files.add( "file://path/to/directory" );

TopologyBuilder builder = new TopologyBuilder();

// add 1 spout with ImageFetcher reading the images from the provided paths
builder.setSpout("spout", new CVParticleSpout(new ImageFetcher(files).sleep(100)), 1 ); // sleep is optional 

// add 1 bolt with cascade classifier initialized with face model. This operation will output the full frame with face features inside
builder.setBolt("face_detect",new SingleInputBolt( 
		new HaarCascadeOperation("face","lbpcascade_frontalface.xml").outputFrame(true)
	), 1).shuffleGrouping("spout");

// extracts detected faces from the frames (based on the bounding box)
builder.setBolt("face_extract", new SingleInputBolt(
		new ROIExtractionOperation("face").spacing(25)
	), 1).shuffleGrouping("face_detect");

// add 1 drawer that simply draws features into a frame and stores them locally (used for testing)
builder.setBolt("drawer", new SingleInputBolt(
		new DrawFeaturesOperation().destination("file://path/to/output/directory")
	), 1).shuffleGrouping("face_extract");

The performance of this topology can be improved by increasing the number of bolts of the slowest step in the topology, in this example the FaceDetector. The number can easily be increased by changing the ‘parallelism hint’ for this bolt as shown in the line below (changed from 1 to 8). Each frame from the stream reader is randomly send to one of the two FaceDetector bolts present.

builder.setBolt("face_detect_bolt", new SingleInputBolt(
	new HaarCascadeOperation("face", "lbpcascade_frontalface.xml").outputFrame(true)),8)
.shuffleGrouping("spout");

Example 3 - extracting multiple features

The second example reads video, performs two types of feature extraction in parallel and combines the result afterwards. Combining simply means putting them in the same Frame object before sending the total result to the next bolt. This topology would typically end with a bolt that stores this combined result but this is not important for this example.

image alt text

Note that the Feature Combiner is a BatchOperation that requires 2 Feature objects with the same streamId and sequenceNr (i.e. features belonging to the same frame). The combiner must wait until it received all required features. Thus this topology runs as slow as the slowest feature extractor which would typically be the SIFT operation. For an optimal operation of the topology it is important to configure a good parallelism hint for each bolt. The right number of bolts to use can be estimated locally but is likely to change somewhat when deployed on the cluster (due to different hardware etc.). In this example it makes sense to configure 4 times more SIFT bolts than face detection bolts. The code snippet below only covers the topology from Scale to Combiner, please see MultipleFeaturesTopology.java within the examples for the full code.

// add bolt that scales frames down to 25% of the original size 
builder.setBolt("scale", new SingleInputBolt( new ScaleImageOperation(0.25f)), 1)
	.shuffleGrouping("spout");

// one bolt with a HaarCascade classifier detecting faces. This operation outputs a Frame including the Features with detected faces.
// lbpcascade_frontalface.xml must be present on the classpath!
builder.setBolt("face", new SingleInputBolt( 
		new HaarCascadeOperation("face", "lbpcascade_frontalface.xml").oututFrame(true)),1)
	.shuffleGrouping("scale");

// add a bolt that performs SIFT keypoint extraction which emits Feature objects instead of Frame
builder.setBolt("sift", new SingleInputBolt( 
		new FeatureExtractionOperation("sift", FeatureDetector.SIFT, DescriptorExtractor.SIFT).oututFrame(false)), 2)
	.shuffleGrouping("scale");
	
// Batch bolt that waits for input from both the face and sift detection bolts and combines them in a single frame object
builder.setBolt("combiner", new BatchInputBolt(
		new SequenceNrBatcher(2), 
		new FeatureCombinerOperation()), 1)
	.fieldsGrouping("sift", new Fields(FrameSerializer.STREAMID)) //register on sift
	.fieldsGrouping("face", new Fields(FrameSerializer.STREAMID)); // register on face

Example 4 - Sequential operation

An alternative topology that produces the exact same results is shown below. Within this topology the two feature extraction operations are executed subsequently using a SequentialFrameOperation. This minimizes the number of times a frame has to be emitted and avoids the dimensionality problem described before. However, the total execution time for a frame (from fetcher to last operation) takes longer. It really depends on the use case which alternative is best. By setting the parallelism hint for the fat bot to 3 this topology has the exact same number of resources than the one in example 3. Please see FatFeaturesTopology.java within the examples for the full code.

image alt text

// specify the list with SingleInputOperations to be executed sequentially by the 'fat' bolt
operations = new ArrayList<SingleInputOperation<CVParticle>>();
operations.add(new HaarCascadeOperation("face","lbpcascade_frontalface.xml") );
operations.add(new FeatureExtractionOperation("sift", FeatureDetector.SIFT, DescriptorExtractor.SIFT));

// add bolt that scales frames down to 25% of the original size 
builder.setBolt("scale", new SingleInputBolt( new ScaleImageOperation(0.25f)), 1)
	.shuffleGrouping("spout");

// Three 'fat' bolts containing a SequentialFrameOperation will will emit a Frame object containing the detected Face and SIft features
builder.setBolt("fat_features", new SingleInputBolt( 
		new SequentialFrameOperation(operations).outputFrame(true).retainImage(true) ), 3)
	.shuffleGrouping("scale");

Example 5 - Tiling

StormCV contains a TilingOperation which can split a single frame into a number of possibly overlapping ‘tiles’. A high resolution frame can for example be split into 4 tiles (2x2) ,each of them can be analyzed in parallel which can increase the overall processing speed. Effectively each tile (for example the top-left one) will get their own streamId which can be used to route tiles through the same bolts. The topology below shows a very simple tiling topology where each frames is split into 4 tiles. The top-right and bottom-left tiles are routed to a grayscale operation while the other two are not. The TileMerger stitches the tiles back together and also combines features the tiles might have (not the case in this example). This results in a stream where half of each frame is grayscale and the other is full color. This example can be executed without OpenCV. The routing of tiles is done by the DummyTileGrouping present in the examples package and specially created for this example.

image alt text

Output of this TilingTopology is a stream looking like this.

image alt text

Please see TilingTopology.java within the examples for the full code and implementation of DummyTileGrouping.

// splits each frame into 4 tiles (2x2) with 0 pixels overlap
builder.setBolt("tiler", new SingleInputBolt(new TilingOperation(2, 2).overlap(0)), 1)
	.shuffleGrouping("spout");

// execute grayscale operation only on tiles 1 and 2 (top right and bottom left)
builder.setBolt("grayscale", new SingleInputBolt(new GrayscaleOperation()), 1)
	.customGrouping("tiler", new DummyTileGrouping(new String[]{"1", "2"}));

// The tile merger stitches the tiles back together and more importantly also merges features created for each tile. 
// The result of the merger is almost the same as if the tiling did not take place 
builder.setBolt("tile_merger", new BatchInputBolt(
		new SequenceNrBatcher(4),
		new TilesRecombinerOperation().oututFrame(true)).groupBy(new Fields(FrameSerializer.SEQUENCENR)
	), 1).shuffleGrouping("grayscale") // only gets tiles 1 and 2 from grayscale
	.customGrouping("tiler", new DummyTileGrouping(new String[]{"0", "3"}));

Example 6 - GroupOfFrames

This example shows the use of the FrameGroupingOperation which collects a configurable number of frames and puts them in a GroupOfFrames object before sending it to the next bolt. This enables the topology to use BatchOperations without the use of a Batcher which in turn makes it possible to put these BatchOperations in a more flexible way (for example use shuffleGrouping).The example topology below shows a pipeline used to: scale down frames, group two subsequent frames, perform Dense Optical Flow on them, convert the optical flow to grayscale and stream the result. The key element here is the grouping of frames which is very cheap and can be done by only one bolt followed by a potentially large number of OpticalFlow operations put in shuffleGrouping.

image alt text

The code below only shows the toplogy from the GrameGrouper up to the Drawer. Please see the BigFlowTopology.java within the examples for the full code and implementation of the OpticalFlowVisualizeOperation.

// groups two subsequent frames together in a single GroupOfFrames object
builder.setBolt("grouper", new BatchInputBolt(
	new DiscreteWindowBatcher(2, 1), 
	new FrameGrouperOperation()
), 1).fieldsGrouping("scale",new Fields(FrameSerializer.STREAMID));

// Two group of frames operation calculating Optical Flow on Groups it receives. 
// This makes the optical flow stateless! 
builder.setBolt("optical_flow", new SingleInputBolt(
		new GroupOfFramesOperation(new OpticalFlowOperation("optical flow"))
	), 2).shuffleGrouping("grouper");

		
// Simple operation that visualizes the optical flow and puts it into a Frame object that can be shown / streamed
builder.setBolt("flow_viz", new SingleInputBolt(
		new OpticalFlowVisualizeOperation("optical flow")), 1 )
	.shuffleGrouping("optical_flow");

Example 7 - FetchAndOperate

StormCV contains a special type of Fetcher that enables the use of an Operation directly within the spout. This method minimizes data transfer but can only be used if the operation can keep up with the fetcher. A typical use case is to scale frames or convert them to grayscale before emitting it to the next bolt. This example shows a rather ‘extreme’ example with a Fetcher and a SequentialFeaturesOperation which makes it possible to perform an unlimited number of operations within the spout. The topology is shown below.

image alt text

List<SingleInputOperation> operations = new ArrayList<SingleInputOperation>();
operations.add(new FeatureExtractionOperation("sift", FeatureDetector.SIFT, DescriptorExtractor.SIFT).outputFrame(true));
operations.add(new DrawFeaturesOperation() );

int frameSkip = 13; 
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout", new CVParticleSpout( 
	new FetchAndOperateFetcher( // meta fetcher to combine a Fetcher and Operation
		new StreamFrameFetcher(urls).frameSkip(frameSkip), // normal fetcher
		new equentialFrameOperation(operations).outputFrame(true).retainImage(true) // sequential operation executed on fetcher output
	)) , 2 );

Running examples on a cluster

Only a few lines of code need to be changed in order to prepare the topologies described above to be executed on a real cluster instead of on local machine.

Change:

LocalCluster cluster = new LocalCluster();
cluster.submitTopology( "topology_name", conf, builder.createTopology() );
Utils.sleep(120*1000); 
cluster.shutdown();

Simply into:

StormSubmitter.submitTopology("topology_name", conf, builder.createTopology());

Before a topology can be deployed on a cluster it needs to be packaged in a single jar file excluding the storm library and it’s dependencies. This packaging can be done using Maven (makes live easier… a lot…) but it is not required. The jar file can also contain additional resources such as model files and C libraries (for example OpenCV). See the deployment guide how this package can be created and deployed on a cluster.

Clone this wiki locally