Skip to content

High Availability Using Standby Operator Execution Instances

Marios Fragkoulis edited this page Oct 17, 2018 · 16 revisions

Algorithm

Nomenclature

See here: Flink's Jobs and Scheduling

Long story short: given a logical job vertex M, the job vertex's representation at the execution plan level is the ExecutionJobVertex m. The latter can be parallelized according to the job's parallelism. In this example the execution is parallelized into two instances called m_1 m_2, which are the two execution vertices managed by m. Each ExecutionVertex is responsible for a single Execution instance, which typically maps to a running parallel (sub)task of a specific operator.

Assumptions

  • Each ExecutionVertex has a set of standby failover Executions that mirror the acknowledged state of the running Execution. The standby Executions are in STANDBY ExecutionState, i.e., they are not running, and receive the committed checkpoints of the running Execution as soon as they are available.
  • Each running Execution, the underlying subtask more specifically, keeps in-flight records locally.

Algorithm

Let a job with three tasks (hiding n-m communication for simplicity) where each operator simply counts the inputs and forwards the input events as outputs.

We mark as |e1| the watermark signifying the marker for epoch #1. Moreover we mark as t1 the first upstream task followed by t2 and t3 as data head downstream. Finally, y are the events that belong to epoch 1 and x are the events that belong to epoch 2.

At moment 1:

x x x x x x x x |e1| -> t1 -> y y y -> t2 -> y y y -> t3 -> y y

At moment 2:

x x x x -> t1 -> x x x -> t2 -> x |e1| -> t3 -> y y y y y y y y

At moment 3:

x x x x -> t1 -> x x -> t2 -> x -> t3 -> x |e1| y y y y y y y y

We checkpoint the counter 8 as state for each of the tasks. The checkpoint is in stable storage. Once the checkpoint is completed, the checkpoint state of each running Execution is copied to the associated standby Executions (tasks). At the same time, each individual task stores their in-flight records (i.e., the records that have not been "checkpointed" by an epoch).

On failure of task t2 at the moment 3:

  1. t2 is replaced with an associated standby Execution which has received t2's state from checkpoint |e1|. The new t2 will set up input and output channels and will start running, ready to accept new tuples.
  2. t1 will have to replay the stream of all xs (as they were not part of the epoch |e1| - only y's were part of it).

Problem: t3 will see the same x twice (as t2 will replay it). In that case, idempotency has to be assured. We could implement this via offsets between pairs of connected operators (tasks) in the job graph.

Storage of in-flight records

There are two choices:

  1. We store in-flight records for each epoch within the checkpoint that is stored in the stable storage.
  2. We store in-flight records on the local machine of each execution vertex.

Case #1 does not modify algorithm above.

Case #2 can cause the following issue: if t3 fails after t2 has been recovered on a machine that does not store the in-flight records of t2, we will have to replay (recursively) from the closest upstream operator that stored in-flight records for the previous epoch. In this case, t1 can do the replay of all the tuples after |e1| had been checkpointed.