Skip to content
Dan Debrunner edited this page Sep 29, 2015 · 11 revisions

Ideas for threading and fusion.

Default to multi threading to utilise multiple cores.

The general idea is that by default any application should perform well, by naturally taking advantage of multiple cores.

  • By default any Java functional logic (TStream.filter, TStream.transform etc.) utilises its own thread to process tuples. This would be achieved by the functional Java operators implementing a Java threaded port (using standard Java Queue implementations) and a thread from the operator's scheduled executor. Thus a pipeline within the same PE would take advantage of multiple threads. This only provides a benefit in distributed mode if the operators are fused together.
  • A mechanism is needed to determine when threading is not required, for low latency cases a direct method call is preferred, where in a pipeline like: s.filter(x).transform(y).filter(z) the same Java thread executes x.test(), y.apply(), and z.filter(), effectively: if (x.test(t)) { if (z.test(y.apply(t))) { ...} }

See Stream Characteristics

Placement of topology elements

At the base level stream processing is implemented by some code (a function or an SPL operator) executing against 0-N input streams and producing 0-M output streams. The code must obviously execute within an operating system process, and thus there are two factors for deciding the mapping of code to processes.

  1. Placement Directives - Directives set by the application, which must be honoured in any distributed execution.
  2. Placement Strategy - Strategy for decided how the complete application is broken in operating system processes, while honouring the directives. Multiple placement strategies may exist and may be selected as submission time, thus different strategies applied to the same application may result in a different number of operating system processes.

Placement Directives

  • TStream.isolate() - Directs that any immediate downstream processing is in a separate process.
  • TStream.lowLatency() - Directs that all processing in a pipeline until TStream.endLowLatency() is in the same operating system process and no threaded ports/queues are inserted. This will typically result in the processing occurring on a single thread, but non-functional operators may utilize additional threads.
  • Placeable.colocate(...) - Directs that specified elements must execute within the same operating system process. Note that there is no requirement that the elements are connected directly or indirectly.
    • [dlaboss] Q: given the "same thread" semantics of low latency regions, it seems like there's a place for a "colocate but not in the same thread" region concept - e.g., stream.colocate(). ... .endColocate()... or maybe instead stream.lowLatency(false/*same thread*/). ... endLowLatency()?
  • Placeable.addResourceTags(...) - Directs that the processing must execute on a resource (host) that is assigned all the tags. Note that when elements are colocated then it is the union of resource tags for all elements.
  • [dlaboss] TODO: parallel regions (parallel()/endParallel()): while not a placement directive, and maybe the only guarantees occur in the context of a particular placement strategy, need to describe the resulting fusion: fusion within a channel, fusion among the channels in the region. Also need to describe the effects of various constructs involving parallel() and placement directives and/or how to specify/achieve various effects such as:
    • each channel in the region is to be in a separate private container; single thread for the whole channel (low latency) / separate thread per functional logic op.
    • the entire region in a single container AND colocated-with/isolated-from the TStream that was parallelized. Similarly, colocated-with/isolated-from the result of TStream.endParallel(); colocated-with some arbitrary TStream.
    • in the context of the Connected into Single Process strategy, it feels like the default behavior should be that a parallel region only introduces a thread at the head of each channel and the region remains colocated with the rest of the flow that it's part of. To get low latency (single thread) within a channel, one might/would have to do parallel(2).lowLatency() ... endLowLatency().endParallel()? To get per-channel containers: parallel(2).isolate() ... endParallel()?

Placement Strategies

Connected into Single Process

  • Goal: All connected elements are implicitly colocated in the same operating system process
  • Exceptions:
  • s.isolate() is handled as s is disconnected from any immediate downstream element.
  • Explicitly colocated elements are handled as being disconnected from any other element they are not colocated with.
  • Elements with resource tags are handled as being disconnected from any other element with a different or empty set of resource tags.