Skip to content

Kafka Executor

James Vaughan edited this page Nov 14, 2018 · 11 revisions

Purpose

The KafkaExecutor is a type of ProcessExecutor that saves it's current state using Kafka. This allows the KafkaExecutor to resume after any planned shutdowns or after a crash.

Components

Object: KafkaExecutor

Responsible for implementing the "ProcessExecutor" interface and is capable of managing the shutdown behaviour of any local components. It is not necessary for all the components to be present locally provided they are present on the Kafka Cluster, although at minimum it requires a local "ResultsListener" to fulfil result futures.

To "execute" a process it generates a PiInstance with a new ID locally, sends a ReduceRequest with that PiInstance then uses a promise listener to wait and respond with the result or exception.

Topic: ReduceRequest

A stream of ReduceRequests, which consist of the PiInstance to reduce along with any returned AtomicProcess results which need to be posted to that instance.

/** Contains the necessary information for reducing a PiInstance.
  *
  * @param pii The latest PiInstance to reduce and post-results into.
  * @param args A collection of result objects for open threads that need to be posted.
  */
case class ReduceRequest(
  pii:  PiInstance[ObjectId],
  args: Seq[CallResult]
) extends AnyMsg with HasPii

Object: Reducer

Consumes the ReduceRequest topic, posts the results into the PiInstances and reduces them. Produces Assignment messages for AtomicProcesses when opening new threads, otherwise posts to "Results" topic with results. It also posts the latest PiInstances to the "PiiHistory" topic.

Topic: Assignment

A stream of AtomicProcess calls which need computing. Contains the process to call and the arguments to use. It also contains the information necessary to uniquely identify id: the PiInstance ID and the call ID (an incrementing id for each AtomicProcess call made by that PiInstance).

/** Emitted by the Reducer to the AtomicProcess executors, assigns responsibility for
  * executing an AtomicProcess to an AtomicProcessExecutor. Uniquely identified by the
  * (PiiId, CallRef) pair.
  *
  * @param pii The PiInstance state when the Assignment was created.
  * @param callRef The ID of this call in the PiInstance.
  * @param process The name of the AtomicProcess to call.
  * @param args The value of each of the arguments to the AtomicProcess.
  */
case class Assignment(
  pii:      PiInstance[ObjectId],
  callRef:  CallRef,
  process:  String,
  args:     Seq[PiResource]
) extends AnyMsg with HasPii

Object: AtomicProcessExecutor

Consumes the Assignment topic, executes AtomicProcesses and posts SequenceRequests with the results. Catches all exceptions, various handling logic is possible, though ultimately it may have to start a SequenceFailure to fail and clean up after the PiInstance.

Topic: PiiHistory

Stores:

  1. PiiUpdate: The latest PiInstance state for a given PiInstance.
  2. SequenceRequest: All the results of AtomicProcess calls which need posting to its PiInstance.
  3. SequenceFailures: The outstanding tasks which need mopping up after an exception.
/** Emitted by the Reducer component to log the latest PiInstance state to the PiHistory.
  * Consumed by the Sequencer component to construct up-to-date ReduceRequests.
  *
  * @param pii The latest state of the PiInstance.
  */
case class PiiUpdate(
  pii:      PiInstance[ObjectId]
) extends PiiHistory with HasPii

/** Emitted by a AtomicProcess executors to sequence their results into a common timeline.
  * Consumed by Sequencers to produce ReduceRequests.
  *
  * @param piiId ID of the PiInstance being executed, PiInstance state needs to be fetched
  *              from the PiiHistory topic.
  * @param request The result of the AtomicProcess call, containing the unique id and a
  *                PiObject representing the result.
  */
case class SequenceRequest(
  piiId: ObjectId,
  request: CallResult
) extends PiiHistory

/** A PiiHistory message which helps collect outstanding SequenceRequests after a failure.
  *
  * @param pii Identifying information for the PiInstance. Just an ObjectId if the state
  *            hasn't been seen, or the latest PiInstance.
  *
  * @param returns All the known results for the returned calls, the PiObjects may be null
  *                in the event of a failure.
  *
  * @param errors All the known exceptions encountered during the execution of this PiInstance.
  */
case class SequenceFailure(
  pii:  Either[ObjectId, PiInstance[ObjectId]],
  returns: Seq[CallResult],
  errors: Seq[PiExceptionEvent[ObjectId]]
) extends PiiHistory { ... }

Object: Sequencer

Consumes the PiiHistory topic, matches up any pending SequenceRequests with the latest PiInstance and posts ReduceRequests for the Reducer.

NOTE: U = PiiUpdate, SR = SequenceRequest, Red = For "Red" PiInstance, Blue = For "Blue" PiInstance

The challenge with the Sequencer is that if multiple PiInstances are assigned to a single PiiHistory partition the resultant ReduceRequests cannot always be sent immediately (see above). If a SequenceRequest arrives without a matching PiiUpdate, we must wait to commit until one is present or it could be forgotten in event of a shutdown. PiiUpdates may be sent as they will return after a redundant pass through the reducer.

Topic: Results

Should really be called "PiiLogs" or "PiEvents", stores the history of all PiEvents .

/** An output message sent to the Results topic. Has no impact on PiInstance execution,
  * but it used by `ResultListeners` to implement the `PiObservable` interface.
  *
  * @param event The `PiEvent` being logged.
  */
case class PiiLog( event: PiEvent[ObjectId] ) extends AnyMsg { ... }

Object: ResultsListener

"ResultsListener"s does no work per se, each one has it's own ConsumerGroup so each ResultsListener receives all PiEvents (a broadcast/subscribe pattern). Implements the "PiObservable" trait for the cluster and the KafkaExecutor.