Skip to content
Corne Versloot edited this page Apr 20, 2015 · 3 revisions

Apache Storm supports DRPC topologies which can be useful for computer vision use-cases as well (training of classifiers, executing image queries on some database etc).

We are aware the current implementation is based on the deprecated LinearDRPCTopologyBuilder instead of Trident but was easier to start with since it is closer to the StormCV primitives already in place.

StormCV uses a number of special classes in order to support DRPC topologies:

  • IRequestOp interface: Implementations of this interface can receive DRPC request
  • RequestBolt: a basic bolt that can execute IRequestOp implementations
  • IBatchOp interface: Implementations can aggregate date for a single DRPC request. As with normal operations it contains an execute function receiving StormCV objects. When all data for a single request has been processed the platform calls getBatchResult to get the aggregated result.
  • IResultOp interface: Implementations of this interface return the result of DRPC requests. IResultOp works in the same way as IBatchOp with te only difference that it must return a single String as its result.
  • BatchBolt: a basic bolt that can hold batching operations (i.e. IBatchOp and IResultOp implementations)

A DRPC topology can be build by chaining normal operations which are preceded by a IRequestOp implementation and followed by a IResultOp. The following snippet shows this.

LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("name");
// the first bolt must be a RequestBolt with an IRequestOp implementation
builder.addBolt(new RequestBolt(new IRequestOpImplementation()), 1); 
// following bolts can have normal bolts / operations
builder.addBolt(new SingleInputBolt( new AnyOperation())	), 2).shuffleGrouping();
// last bolt must be a BatchBolt holding a IResultOp implementation. Typically using a grouping on the 
// DRPC RequestID to make sure all results for a single request end up with the same task
builder.addBolt(new BatchBolt(new IResultOpImplementation()), 1)
    .fieldsGrouping(new Fields(CVParticleSerializer.REQUESTID));

Also see Example 8 in the Examples project (nl.tno.stormcv.example.drpc package)

Clone this wiki locally