Skip to content

Consistent Region Support for Messaging Toolkit

Alex-Cook4 edited this page Dec 7, 2015 · 14 revisions

This documents describes the design of consistent region for each of the operators in the Messaging Toolkit.

Kafka

  • KafkaProducer will not be allowed at the beginning of a consistent region

  • KafkaConsumer has been developed to achieve exactly-once processing in consistent regions for a specified partition. We use the low level Simple Consumer Kafka API and support replaying of data for consistent regions. This support extends to single-partition/single-topic operators. Multi-partition topics can be consumed using multiple operators, or more simply through user defined parallelism (UDP).

On Drain: No action necessary

On Checkpoint: The operator saves the offset within the message log that we are currently reading from.

On Reset: Use saved offset to begin reading from that location in the message log.

On ResetToInit: There is currently no reliable way to reset to initial state. At this time, if the consumer is forced to reset to the initial state, it will begin reading the latest messages.

JMS

  • JMSSource is being developed to participate in a consistent region. We will use JMS's CLIENT_ACKNOWLEDGE mode to support replay of messages in consistent regions. This operator support both operator driven and periodic consistent region configuration. When operator driven mode is selected, then the parameter "triggerCount" is required so that it will initiate a drain right after the number of messages specified by "triggerCount" has been processed.

There can be 2 main scenarios that messages on a queue will be consumed.

  1. Single message consumer: This is actually a relatively simple case to support as messages will always come to this only consumer, the action taken on drain/checkpoint/reset described in the following section will make sure consistent region contract is being met.

  2. Multiple message consumers: Due to the nature of messaging system, when multiple consumers are reading message from a same queue, if any one of the consistent region requires a reset on its JMSSource operator, after reset, the previously receive message may not come back to the same JMSSource instance. Therefore, we will need to require deterministic routing for this case, so after an instance of JMSSource recovers from reset, we will be sure the same set of previous messages received before the reset will come back again.

To achieve deterministic routing of messages, we introduced messageSelector parameter, which is actually the JMS messageSelector. This parameter will make sure only messages with properties matching the message selector expression are delivered.

The JMSSource operator will not be able to figure out if deterministic routing of messages is fully met, it will rely on the streams application designer to make sure deterministic routing is/will be in place before putting JMSSource in a consistent region.

The JMSSource operator utilizes the JMS client acknowledge mode to support consistent region, i.e at "checkpoint", it will remove the previously received message off the queue and at "reset", it simply asks messaging server to resend those message previously received after last successful checkpoint. As we know that even if a operator in a consistent region can successfully exit from its checkpoint method, but any downstream operator may still fail at checkpoint, that causes the whole checkpoint to fail. This scenario will make JMSSource not being able to replay tuples, therefore, JMSSource is actually doing delayed checkpoint when process is resumed, more details is explained in the following section.

Actions performed by Statehandler methods.

On Drain: No actions needed.

On Checkpoint: There is a flag (doCheckpoint) that we will set it to true in here to indicate a checkpoint has been made. This flag will be checked when the operator is resumed to process messages again. This way we know that we can safely acknowledge a message as the entire checkpoint is a success.

On Reset: Reset the flag (doCheckpoint) to false, so when process is resumed we will not acknowledge message. Recover the JMS session object so that we are getting messages replayed. There is also a message deduplication effort to detect/ignore duplicate messages. A case where this extra step is needed is when a checkpoint has been made, then operator resumes again, the first thing it will do is actually checking the docheckpoint flag (in this case, it is set to true), but before it acknowledges the message, something goes wrong, and a reset is required. After reset, the operator will receive same set of messages, which are supposed to have been removed from message queue because of the last successful checkpoint. this is where the deduplication can be helpful to filter out any duplicated messages.

On ResetToInital: Reset the flag (doCheckpoint) to false, so when process is resumed we will not acknowledge message. Recover the JMS session object so that we are getting messages replayed.

  • JMSSink is stateless and can participate in a consistent region.

MQTT

MQTTSink Operator

Limitations:

  1. MQTTSink operator can not be placed at start of a consistent region.
  2. MQTTSink operator does not support tuple replay for control port. The operator will issue warning messages to user about control port behavior.

On Drain: The operator will flush all tuples remaining in its internal buffer to the remote messaging server.

On Checkpoint: The operator saves states of qos, serverUri, topics attributes.

On Reset: The operator restores the attribute states saved in the provided checkpoint, i.e previously saved checkpoint.

On ResetToInit: the operator restores value of qos, serverUri and topics to its initial state.

With this approach, the MQTTSink operator may expect the following behavior when it participates in a consistent region.

  • Messages with qos=1 and 2 will be delivered to messaging server at least once. Duplicate tuples is expected. (Does this mean we are droping support of qos=0, also allowing duplicate tuples will break qos=2 protocol.)

Samantha: No, it does not mean we are dropping support for qos=0. It means that user must set qos=1, 2 to guaranteed message delivery of at least once from Streams.

  • MQTTSink operator does not support control port tuple restore, user may want to replay control signals after a reset.

MQTTSource Operator:

  1. This operator can NOT participate in a consistent region.
  2. User should use ReplayableStart operator after MQTTSource operator to achieve tuple replay.
  3. Since this operator has internal buffer, a new attribute will be introduced allowing user to control size of internal buffer. In case of having replayableStart operator after it, setting size of internal buffer to 1 will help avoiding losing tuples in case of reset.

XMS

  • XMSSource cannot participate in a consistent region. Customers are expected to use ReplayableSource from the standard toolkit to prevent loss of tuples.

  • XMSSink is stateless and can participate in a consistent region.