Skip to content

Advanced

corneversloot edited this page Dec 15, 2014 · 30 revisions

Extending StormCV

Adding FileConnectors

StormCV requires registered FileConnector implementations to get files from remote locations. The current distribution only contains 2 connectors; one for local files and one for Amazon S3 buckets. It is possible to add connectors by implementing the FileConnector interface and register their classes within StormCVConfig. Fetchers that have to read remote files use registered connectors to do this. The schema of the URI pointing to some location is used to get the right connector. For example the URI ‘s3://bucket/some/path’ will be passed to the S3Connector because of its ‘s3’ schema. Fetchers do not need to be changed to make this work. The FileConnector functions that need to be implemented should speak for themselves:

  • getProtocol(): returns the protocol the connector can work with, i.e. http, ftp, file etc

  • prepare(Map stormConf): is called directly after construction and enables connectors to initialize themselves, possibly using parameters registered in the config (for example username and password).

  • setExtensions(String[] extensions): is used to set acceptable file extensions used by the list() function. The ImageFetcher for example will use the setExtentions function to dictate images to be listed.

  • moveTo(URI): moves the current working point (file or directory) to the specified URI

  • copyFile(File localFile, boolean delete): uploaded the provided file to the current working point on the remote location. If delete is set to true the local file must be deleted after the upload has finished

  • list(): lists the files within the current working directory given the extensions set. If the current working point is a file that matches the configured extensions it should return this file.

  • getAsFile(): download the current working file to the local filesystem and return the File reference once the download is done. Files should typically be downloaded to the local systems temp directory.

  • deepCopy() returns a new instance of itself including any configuration like username and passwords. Copies can be used by bolts to access the same remote location using different instances (for example for parallel downloading of files).

New FileConnector implementations must be registered in the StormCVConfiguration which enables Spouts and Bolts to use them:

// register connector class when you are defining the topology
config.registerConnector(YourFileConnector.class)
// Create a ConnectorHolder in the prepare function of Fetchers and Operations
ConnectorHolder cHolder = new ConnectorHolder(stormConf);

Developing Fetchers

Fetchers are responsible to read data and are executed within Storm Spouts. Creating your own Fetcher can be done by implementing the IFetcher interface. This interface is explained within the dummy implementation below. When the Fetcher must read data from remote locations it can instantiate its own ConnectionHolder using the StormCVConfiguration holding registered FileConnectors.

// dummy Fetcher implementation
public class DummyFetcher implements IFetcher<Frame> {

	private String imageType;
	private ConnectionHolder cHolder;
	private List<String> locations;
	private File localFile;

	// do some preparation here, reading parameters from the stormConf for example
	public void prepare(Map stormConf, TopologyContext context) throws Exception{
		// read the preferred image encoding to be used
		this.imageType = (String)stormConf.get(StormCVConfig.STORMCV_FRAME_ENCODING);
		// initialize the connector holder which can be used to read data from remote soruces (like Amazon S3)
		this.cHolder = new ConnectorHolder(stormConf);
		// perform any other initialization stuff...
		this.locations = (List<String>)stormConf.get("your_own_config_key");
	}

	// return the right type of Serializer, since this Fetcher produces Frames this method returns a FrameSerializer	
	public CVParticleSerializer<Frame> getSerializer(){
		return new FrameSerializer();
	}

	// called after preparation when the topology is activated
	public void activate(){
		String location = locations.remove(0);
		FileConnector fc = cHolder.getConnector(location);
		fc.moveTo(location);
		this.localFile = fc.getAsFile();
	}

	// called when the topology is deactivated and can be used to free resoruces, write state etc
	public void deactivate(){}

	// is called by the spout to request new data (result can be NULL) 
	public Frame fetchData(){
		// read frames from some source, for example from this.localFile
		return new Frame("streamId", 0L, this.imageType, SomeImage, 0L, new Rectangle(0, 0, 800, 640));
	}
}

Developing Operations

Operations are responsible for performing actions on frames and or features. They consist of basic operations that do not use the OpenCV library and operations that do. There are two types of operations:

  • SingleInputOperations work on single frames
  • BatchOperations work on a list of frames

Both of them return zero or more objects of the model (GroupOfFrames, Frame, Feature or Descriptor). Depending on the type returned the Operation must also provide the right Serializer implementation. Hence, an operation returning Feature objects must also provide the FeatureSerializer. If the Operation returns something else than Feature it will cause serialization exceptions.

SingleInputOperations

A single input operation works on a single object it receives. Creating your own single input operation can be done by implementing the ISingleInputOperation interface. The following code implements this interface in a dummy single input operation which just returns the input:

// a dummy single input operation that returns the input
public class DummySingleInputOp implements ISingleInputOperation<Frame> {

	public void prepare(Map stormConf, TopologyContext context)throws Exception {
		// Code that needs to be called once to initialize the operation, such as libraries or dependencies on the initialization parameters of the Topology.
	}

	public void deactivate() {
		// Code to clean up after the topology has run.
	}

	public CVParticleSerializer<Frame> getSerializer() {
		// The serializer to use, here the frame serializer is chosen.
		return new FrameSerializer();
	}

	public List<Frame> execute(CVParticle particle) throws Exception {
		// Code that runs on each frame, here it simply returns the input frame.
		List<Frame> result = new ArrayList<Frame>();
		if(!(particle instanceof Frame) return result; // check if the particle is a Frame instance
		Frame sf = (Frame)particle;
		result.add(sf);
		return result;
	}
}

BatchOperations

A batch operation works on a multiple frames. Creating your own batch operation can be done by implementing the IBatchOperation interface. The following code implements this interface in a dummy batch operation which just returns the input:

// a dummy single input operation that returns the input
public class DummyBatchOp implements IBatchOperation<Frame> {

	public void prepare(Map stormConf, TopologyContext context) throws Exception {
		// Code that needs to be called once to initialize the operation, such as libraries or dependencies on the initialization parameters of the Topology.
	}

	public void deactivate() {
		// Code to clean up after the topology has run.
	}

	public CVParticleSerializer<Frame> getSerializer() {
		// The serializer to use, here the frame serializer is chosen.
		return new FrameSerializer();
	}

	public List<Frame> execute(List<CVParticle> input) throws Exception {
		// Code that runs on each frame, here it simply returns the input frames.
		return input;
	}
}

OpenCV operations

StormCV contains the abstract class OpenCVOperation that makes it relatively easy to create Operations that work with OpenCV. Loading of the configured OpenCV library is done automatically when OpenCVOperation is extended. Conversion of BufferedImages to Mat and back can be found in the ImageUtils class. An example OpenCVOp implementation that only does this conversion is shown below.

// a dummy operation that will create and return Frame objects
public class DummyOpenCVOp extends OpenCVOperation<Frame> implements SingleInputOperation<Frame> {
		
	// nothing to prepare so this remains empty
	protected void prepareOpenCVOp(Map stormConf, TopologyContext context) throws Exception{ }
		
	// execute is called by the bolt that received input
	public List<Frame> execute(Frame input) throws Exception {
		// create Mat object
		Mat mat = ImageUtils.bytes2Mat(input.getImageBytes());
		// turn mat back into an image (or bytes representing the image)
		BufferedImage image = ImageUtils.Mat2ImageBytes(mat, Frame.JPG_IMAGE);
		// put the image back in the Frame object
		input.setImage(image, Frame.JPG_IMAGE);
		// return your result
		return frame;
	}
}

Developing Batchers

Advanced elements

Grouping operations within Bolts

Developing CustomGroupings

Groupings determine how data is routed through topologies by selecting the next task a piece of data will be sent to. If there are for example three instances of a specific bolt (i.e. parallelism hint of 3) the the specified grouping must decide for each tuple which from the three tasks it must be send to. Storm contains some groupings like ShuffleGrouping and FieldsGrouping but it is also possible to make your own by implementing the CustomStreamGrouping interface. The implementation below shows how this can be done.

// simple implementation that chooses the task randomly (i.e. same as shuffleGrouping)
public class RandomGrouping implements CustomStreamGrouping {
	
	private List<Integer> targetTasks; // list holding the id's of possible targets to send data to

	// prepare the grouping, most important is to get the list with target task id's
	public void prepare(WorkerTopologyContext context, GlobalStreamId streamId, List<Integer> targetTasks) {
		this.targetTasks = targetTasks;
	}

	// return the id's of tasks to sent the tuple (values) to. 
	// returning multiple id's will mean the data is duplicated
	// returning an empty list will cause the system to drop the tuple; this can be used to filter items!
	public List<Integer> chooseTasks(int taskId, List<Object> values) {
		List<Integer> targets = new ArrayList<Integer>(); // list with id's to sent the values to
		else targets.add( targetTasks.get( (int)Math.floor(Math.random() * targetTasks.size()) ) );
		return targets;
	}
}

As described in this example Groupings have the ability to duplicate data or filter it. Filtering items will not break Storms fault tolerance model.

Extending the model

If the current model does not suit your needs you can extend it or create your own objects. Extending the model requires three steps:

  1. Create a new class which extends CVParticle or an already existing class in the model package. You can add whatever fields you want in the class.

  2. Create a Serializer for the new Class by extending nl.tno.stormcv.model.serializer.CVParticleSerializer and implementing it accordingly

  3. Register the serializer in the StormCVConfig when you build a topology so the platform can actually (de)serialize objects of the newly created class.

The serializer actually serializes objects from and to Storm Tuples and from and type byte representations which is required when the object is nested within another object. Please see the kryo documentation how this works. A small example is given below.

Class

// a dummy custom CVParticle class containing three fields
public class YourClass extends CVParticle {
		
	private int someInt;
	private String someString;
	private Feature feature;

	public YourClass(String streamId, long sequenceNr, int someInt, String someString, Feature feature){
		super(streamId, sequenceNr);
		this.someInt = someInt;
		this.someString = someString;
		this.feature = feature;
	}

	public YourClass(Tuple tuple, int someInt, String someString, Feature feature){
		super(tuple);
		this.someInt = someInt;
		this.someString = someString;
		this.feature = feature;
	}

	// and some getters and setters...
}

Serializer

// Serializer implementation for YourClass
public class DummySerializer extends CVParticleSerializer<YourClass> {

	// static field names makes it easier to use them
	public static final int SOME_INT = "someInt";
	public static final String SOME_STRING = "someString";
	public static final Feature FEATURE= "feature";

	// specifies the field names of YourClass that will be serialized and in what order
	protected List<String> getTypeFields() {
		List<String> fields = new ArrayList<String>(3);
		fields.add(SOME_INT);
		fields.add(SOME_STRING);
		fields.add(FEATURE);
		return fields;
	}

	// write fields of YourClass as a set of Values in the exact same order specified in getTypeFields()!
	protected Values getValues(YourClass object) throws IOException {
		return new Values(object.getSomeInt(), object.getSomeString(), object.getFeature());
	}

	// create an object of YourClass given the values provided in the tuple. These values are the one written in the getValues() function
	protected YourClass createObject(Tuple tuple) throws IOException {
		return new YourClass(tuple, tuple.getIntegerByField(SOME_INT), 
			tuple.getStringByField(SOME_STRING), (Feature)tuple.getObjectByField(FEATURE) );
	}

	// writes YourClass object to the kryoOutput. See the method body for examples.
	protected void writeObject(Kryo kryo, Output kryoOutput, YourClass object) throws Exception {
		kryoOutput.writeInt(object.getSomeInt());
		kryoOutput.writeString(object.getSomeString());
		kryo.writeObject(kryoOutput, object.getFeature());
	}

	// reads the fields of YourClass from the kryo input in the same order they were written in writeObject(...). See the method body for examples
	protected YourClass readObject(Kryo kryo, Input input, Class<YourClass> clas, String streamId, long sequenceNr) throws Exception {
		int someInt = input.readInt();
		String someString = input.readString();
		Feature feature = kryo.readObject(input, Feature.class);
		return new YourClass(streamId, sequenceNr, someInt, someString, feature);
	}
}

Tips & Tricks

Remove images from Frame objects as early in the topology as possible. Retaining images within Frames can consume a lot of bandwidth. A single Frame being processed by three bolts will also be be transmitted three times (unless bolts are located on the same server). Imagine how this affects the network load when processing each frame (encoded as JPG) from a large number of streams. Hence the tip is to ditch the actual images as early in the topology as possible and start processing the calculated features.

Chain operations within a single bolt whenever possible to minimize overhead. This tip follows the reasoning from the one described above. Transmitting the same frame multiple times consumes more bandwidth and also CPU resources due to (de)serialization. See example 4 how this can be done.

Estimate parallelism hints for bolts is tricky business, especially in larger topologies. A good way to start is to run the topology locally with a parallelismHint of 1 for each spout/bolt and register a MetricsConsumer as shown below. In local mode this consumer will print all kinds of metrics for each type of bolt in the topology to the console (every 60 seconds). One of those metrics is the execute latency which tells you how long it takes a bolt to fully process a tuple. This information can be used to estimate the number of each type of bolt relative to each other. If bolt A takes 100ms and bolt B 350ms it is logicall that you need atleast 3.5 times more B bolts than A bolts. This same trick can be used by deploing the topology on the cluster and looking at the average execute latency reported in the web UI. It usually takes a number of iterations to get this right!

    // adding a metrics consumer to the StormCVConfig
    conf.registerMetricsConsumer(LoggingMetricsConsumer.class);

Use setMaxSpoutPending(...) in the configuration to throttle the Spouts within the topology by specifying the maximum number of uncompleted tuples each spout may have. Storm will temporarily stop requesting more data from Spouts (i.e. fetchers) when the specified maximum is reached which will avoid a flood of Frames in the topology.

    // specifying a maximum of 32 uncompleted frames per spout
    conf.setMaxSpoutPending(32);