-
Notifications
You must be signed in to change notification settings - Fork 735
Databus 2.0 Client Event Model and Consumer API
h1. Definitions
Term | Definition |
Databus source | An abstraction for a data source in a database that Databus consumers can monitor for changes. You can think of the Databus source as a SQL table or view. |
Databus relay | A component that serves as a router between the monitored Databus sources and the consumers which monitor for changes in the sources. The relay makes sure that a consumer sees all changes the consumer is interested in and only those changes. |
Databus consumer | An object that implements the Databus callback API and listens to a stream of data events |
Databus consumer group | A group of consumers that process a single stream of data events as a group |
Databus client | The Databus provided library that lets the consumer communicate with one or more Databus relays. |
Data change event | An object that encapsulates the change of the data associated with a given Databus source key that is transmitted to the Databus consumers |
Consistency window | A sequence of events that preserve the consistency of the data, i.e. if one started from a consistent state of the data and applied all events, they will finish in a consistent state. |
h1. Data Change Events
Data change events encapsulate the changes in the source database for a given primary key. At a high-level a data change event has three parts:
The meta data of an event consists of:
The full API is available here DbusEvent.java
h1. Databus Event Types:
StartDataEventSequence - denotes the start of a sequence of data events from an events consistency window.
A typical sequence of the above events can look like.
StartDataEventSequence(startSCN) –> StartSource(Source1) –> DataEvent(Source1.data1) –> … –> DataEvent(Source1.dataN) –> EndSource(Source1) –> StartSource(Source2) –> DataEvent(Source2.data1) –> … –> DataEvent(Source2.dataM) –> EndSource(Source2) –> … –> EndDataEventSequence(endSCN)
Intuitively, the Databus client communicates with the consumer: “Here is the next batch of changes in the watched tables (sources). The changes are broken down by tables. Here are the changes in the first table, then the changes to the next table, etc. All the changes represent the delta from the previous consistent state of the database to the following consistent state.”
The above Databus event sequence is specific for a given consumer. Each consumer specifies an order of Databus sources it is interested in. The Databus client ensures that the data change events for the different sources are always presented to the consumer in that order.
It is good question to wonder about the order of the DataEvent events for a given source. We have also discussed and considered the addition of a Checkpoint event during the processing of data change events, but decided not to include checkpoint eventing in the databus2 api at this point. The idea is to allow the consumer to save intermediate state during processing. If the consumer supports such an event and a failure occurs, the client will try to resume the data events from the latest checkpoint. This can be helpful in processing large data event sequences, say after an update to the database that touched 20% of the rows.
Anywhere during the above event sequence, the client can send a Rollback message which means that an irrecoverable error occurred while obtaining the current event consistency window. The processing will resume with another window. The new window may start the same or earlier SCN to guarantee that no events are missed but some of the events may be replayed.
The Rollback event allows the Databus client to store minimal state while processing incoming data change events by shifting some of the responsibility for recovery to the consumer. The Databus client will provide a standard capability to buffer incoming events until an EndDataEventSequence event is seen. In this case, the consumer does not have to deal with the Rollback event but some of the available memory will be used by the Databus client for buffering and thus the memory will be unavailable to the consumer.
The event state transition is given below.
If the Databus consumer uses the multi-threaded API, each thread will see all StartDataEventSequence, EndDataEventSequence, StartSource, EndSource events and only a portion of the DataEvent events. There is a barrier synchronization across all threads at each StartSource and EndSource event.
h2. Databus event types
h3. DataEventSequence event type
The event type is used for [StartDataEventSequence|#DatabusEventTypes], [EndDataEventSequence|#DatabusEventTypes] and [Rollback|#DatabusEventTypes] Databus events.
interface DataEventSequenceEvent
{
= SCN getScn();=
}
For StartDataEventSequence, the SCN is the first SCN in the data event sequence. For EndDataEventSequence, the SCN is the last SVN in the data event sequence. For Rollback, the SCN is the SCN to which the rollback is being done, i.e. the SCN from the latest StartDataEventSequence event.
{anchor:SourceEvent}
h3. Source event type
The event type is used for both [StartSource|#DatabusEventTypes] and [EndSource|#DatabusEventTypes] Databus events.
{code:title=com.linkedin.databus2.api.events.SourceEvent|borderStyle=solid} interface SourceEvent { Source getDatabusSource(); Schema getDataEventSchema(); } {code}
{anchor:DataEvent}
h3. DataEvent event
{code:title=com.linkedin.databus2.api.events.DataEventEvent|borderStyle=solid} interface DataEventEvent { SCN getScn(); long getTimestamp(); byte[] getKeyBytes(); byte[] getValueBytes(); long getLongKey(); String getStringKey(); <V> V getTypedValue(V reuse, Class<V extends SpecificRecord> targetClass, Schema sourceSchema); } {code}
{note} Note on schema management Currently, webtrack 2.0 returns generic records rather than specific records. So we get to chose the API. Chavdar Botev did a simple test with the implementation below and it seems to work.
{code:borderStyle=solid} public static <T extends SpecificRecord> T readRecordJson(byte[] recBytes, T dest, Class<T> aClass, Schema sourceSchema) throws Exception { T reuse = null != dest ? dest : aClass.newInstance(); JsonDecoder jsonDec = new JsonDecoder(sourceSchema, new ByteArrayInputStream(recBytes)); ResolvingDecoder resDec = new ResolvingDecoder(sourceSchema, reuse.getSchema(), jsonDec); SpecificDatumReader<SpecificRecord> reader = new SpecificDatumReader<SpecificRecord>(reuse.getSchema()); return aClass.cast(reader.read(reuse, resDec)); } {code}
The user will have to pass in the source schema (as obtained from the SourceEvent) or we have to store this as part of {{DataEventEvent}}. {note}
h1. Databus Consumers API
Unlike Databus v1.0, the Databus 2.0 events listener is not a generic class. This makes it easier to write listeners that listen to events from multiple sources. As described in [DataEvent|#DataEvent], such listeners can use {{DataEventDeserializer<K,V>}} to get typed data.
h3. DatabusEventListener
Listener to [StartDataEventSequence|#DatabusEventTypes] and [EndDataEventSequence|#DatabusEventTypes] events using [DataEventSequenceEvent|#DataEventSequenceEvent] event type.
{code:title=com.linkedin.databus2.api.events.DataEventSequenceEventListener|borderStyle=solid} interface DatabusEventListener { void startDataEventSequence(DataEventSequenceEvent e); void endDataEventSequence(DataEventSequenceEvent e); void rollback(DataEventSequenceEvent e); void startSource(SourceEvent e); void endSource(SourceEvent e); boolean dataEvent(DataEventEvent e); } {code}
Alternate interface that avoids to need to construct “additional Databus event objects” (does this mean sharing event objects across “rows” or what is meant by this statement?) is
{code:title=com.linkedin.databus2.api.events.DataEventSequenceEventListener|borderStyle=solid} interface DatabusEventListener { void startDataEventSequence(SCN startScn); void endDataEventSequence(SCN endScn); void rollback(SCN startScn); void startSource(Source source, Schema sourceSchema); void endSource(SourceEvent source, Schema sourceSchema); boolean dataEvent(DataEventEvent e); } {code}
If a listener returns {{false}} to {{dataEvent(DataEventEvent e)}}, the Databus client has to stop the event processing loop. Consumer should try to process all exceptions in their logic. If the Databus client encounters an exception from the consumer, it will stop the event processing loop.
We may add further configuration options to the client to retry along with Retry semantics:
h1. Execution Model
{{onStartDataEventSequence}}, {{onEndDataEventSequence}}, {{onCheckpoint}}, {{onError}}, {{onEndSource}} have barriers before the invocation and after the completion of the callback
Here is a sample distribution of callbacks across a group of 3 consumers, a single consumer and a group of 2 consumers registered for the same source(s). We have illustrated only a single event window with a single source and 4 data events.
!http://viewvc.corp.linkedin.com/viewvc/netrepo/databus2/trunk/doc/user_docs/DatabusExecModel.png?view=co|width=50%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%!
h1. Parallel Consumption