Skip to content

Databus 2.0 Client Event Model and Consumer API

phanindraganti edited this page Jan 16, 2013 · 8 revisions

h1. Definitions

Term Definition
Databus source An abstraction for a data source in a database that Databus consumers can monitor for changes. You can think of the Databus source as a SQL table or view.
Databus relay A component that serves as a router between the monitored Databus sources and the consumers which monitor for changes in the sources. The relay makes sure that a consumer sees all changes the consumer is interested in and only those changes.
Databus consumer An object that implements the Databus callback API and listens to a stream of data events
Databus consumer group A group of consumers that process a single stream of data events as a group
Databus client The Databus provided library that lets the consumer communicate with one or more Databus relays.
Data change event An object that encapsulates the change of the data associated with a given Databus source key that is transmitted to the Databus consumers
Consistency window A sequence of events that preserve the consistency of the data, i.e. if one started from a consistent state of the data and applied all events, they will finish in a consistent state.

h1. Data Change Events

Data change events encapsulate the changes in the source database for a given primary key. At a high-level a data change event has three parts:

Meta data

Key

Data

The meta data of an event consists of:

opcode - specifies if the change is an UPSERT (i.e. update or insert) or DELETE

sequence - specifies the sequence number of event in the update timeline of the event

The full API is available here [DataChangeEvent|http://viewvc.corp.linkedin.com/viewvc/netrepo/databus2/trunk/databus-core/databus-core-impl/src/main/java/com/linkedin/databus/core/DataChangeEvent.java?view=markup].

h1. Consumers and Consumer Groups

Consumers

h1. Databus Event Types:

StartDataEventSequence - denotes the start of a sequence of data events from an events consistency window.

StartSource - denotes the start of data events from the same Databus source

DataEvent - denotes a data change event for the current Databus source

EndSource - denotes the end of data events from the same Databus source

EndDataEventSequence - denotes the end of a sequence of data events with the same SCN

Rollback - an irrecoverable error occurred while obtaining the current event consistency window

A typical sequence of the above events can look like.

{noformat} StartDataEventSequence(startSCN) –> StartSource(Source1) –> DataEvent(Source1.data1) –> … –> DataEvent(Source1.dataN) –> EndSource(Source1) –> StartSource(Source2) –> DataEvent(Source2.data1) –> … –> DataEvent(Source2.dataM) –> EndSource(Source2) –> … –> EndDataEventSequence(endSCN) {noformat}

Intuitively, the Databus client communicates with the consumer: “Here is the next batch of changes in the watched tables (sources). The changes are broken down by tables. Here are the changes in the first table, then the changes to the next table, etc. All the changes represent the delta from the previous consistent state of the database to the following consistent state.”

The above Databus event sequence is specific for a given consumer. Each consumer specifies an order of Databus sources it is interested in. The Databus client ensures that the data change events for the different sources are always presented to the consumer in that order.

{note}

{note}

Anywhere during the above event sequence, the client can send a Rollback message which means that an irrecoverable error occurred while obtaining the current event consistency window. The processing will resume with another window. The new window may start the same or earlier SCN to guarantee that no events are missed but some of the events may be replayed.

The Rollback event allows the Databus client to store minimal state while processing incoming data change events by shifting some of the responsibility for recovery to the consumer. The Databus client will provide a standard capability to buffer incoming events until an EndDataEventSequence event is seen. In this case, the consumer does not have to deal with the Rollback event but some of the available memory will be used by the Databus client for buffering and thus the memory will be unavailable to the consumer.

The event state transition is given below.

!http://viewvc.corp.linkedin.com/viewvc/netrepo/databus2/trunk/doc/user_docs/DatabusV2Events.png?view=co|width=50%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%!

If the Databus consumer uses the multi-threaded API, each thread will see all StartDataEventSequence, EndDataEventSequence, StartSource, EndSource events and only a portion of the DataEvent events. There is a barrier synchronization across all threads at each StartSource and EndSource event.

!DatabusV2EventsMT.png|thumbnail!

h2. Databus event types

{anchor:DataEventSequence}

h3. DataEventSequence event type

The event type is used for [StartDataEventSequence|#DatabusEventTypes], [EndDataEventSequence|#DatabusEventTypes] and [Rollback|#DatabusEventTypes] Databus events.

{code:title=com.linkedin.databus2.api.events.DataEventSequenceEvent|borderStyle=solid} interface DataEventSequenceEvent { SCN getScn(); } {code}

For StartDataEventSequence, the SCN is the first SCN in the data event sequence. For EndDataEventSequence, the SCN is the last SVN in the data event sequence. For Rollback, the SCN is the SCN to which the rollback is being done, i.e. the SCN from the latest StartDataEventSequence event.

{anchor:SourceEvent}

h3. Source event type

The event type is used for both [StartSource|#DatabusEventTypes] and [EndSource|#DatabusEventTypes] Databus events.

{code:title=com.linkedin.databus2.api.events.SourceEvent|borderStyle=solid} interface SourceEvent { Source getDatabusSource(); Schema getDataEventSchema(); } {code}

{anchor:DataEvent}

h3. DataEvent event

{code:title=com.linkedin.databus2.api.events.DataEventEvent|borderStyle=solid} interface DataEventEvent { SCN getScn(); long getTimestamp(); byte[] getKeyBytes(); byte[] getValueBytes(); long getLongKey(); String getStringKey(); <V> V getTypedValue(V reuse, Class<V extends SpecificRecord> targetClass, Schema sourceSchema); } {code}

{note} Note on schema management Currently, webtrack 2.0 returns generic records rather than specific records. So we get to chose the API. Chavdar Botev did a simple test with the implementation below and it seems to work.

{code:borderStyle=solid} public static <T extends SpecificRecord> T readRecordJson(byte[] recBytes, T dest, Class<T> aClass, Schema sourceSchema) throws Exception { T reuse = null != dest ? dest : aClass.newInstance(); JsonDecoder jsonDec = new JsonDecoder(sourceSchema, new ByteArrayInputStream(recBytes)); ResolvingDecoder resDec = new ResolvingDecoder(sourceSchema, reuse.getSchema(), jsonDec); SpecificDatumReader<SpecificRecord> reader = new SpecificDatumReader<SpecificRecord>(reuse.getSchema()); return aClass.cast(reader.read(reuse, resDec)); } {code}

The user will have to pass in the source schema (as obtained from the SourceEvent) or we have to store this as part of {{DataEventEvent}}. {note}

h1. Databus Consumers API

Unlike Databus v1.0, the Databus 2.0 events listener is not a generic class. This makes it easier to write listeners that listen to events from multiple sources. As described in [DataEvent|#DataEvent], such listeners can use {{DataEventDeserializer<K,V>}} to get typed data.

h3. DatabusEventListener

Listener to [StartDataEventSequence|#DatabusEventTypes] and [EndDataEventSequence|#DatabusEventTypes] events using [DataEventSequenceEvent|#DataEventSequenceEvent] event type.

{code:title=com.linkedin.databus2.api.events.DataEventSequenceEventListener|borderStyle=solid} interface DatabusEventListener { void startDataEventSequence(DataEventSequenceEvent e); void endDataEventSequence(DataEventSequenceEvent e); void rollback(DataEventSequenceEvent e); void startSource(SourceEvent e); void endSource(SourceEvent e); boolean dataEvent(DataEventEvent e); } {code}

Alternate interface that avoids to need to construct “additional Databus event objects” (does this mean sharing event objects across “rows” or what is meant by this statement?) is

{code:title=com.linkedin.databus2.api.events.DataEventSequenceEventListener|borderStyle=solid} interface DatabusEventListener { void startDataEventSequence(SCN startScn); void endDataEventSequence(SCN endScn); void rollback(SCN startScn); void startSource(Source source, Schema sourceSchema); void endSource(SourceEvent source, Schema sourceSchema); boolean dataEvent(DataEventEvent e); } {code}

If a listener returns {{false}} to {{dataEvent(DataEventEvent e)}}, the Databus client has to stop the event processing loop. Consumer should try to process all exceptions in their logic. If the Databus client encounters an exception from the consumer, it will stop the event processing loop.

We may add further configuration options to the client to retry along with Retry semantics:

Retry the failing DataEvent

Retry the failing DataEvent and all subsequent events.

Retry all DataEvents from this source

Retry all DataEvents from the event consistency window.

h1. Execution Model

Each consumer callback receives all of the following callbacks: {{on

{{onDataEvent}} callbacks are distributed across consumers in a group

{{onStartDataEventSequence}}, {{onEndDataEventSequence}}, {{onCheckpoint}}, {{onError}}, {{onEndSource}} have barriers before the invocation and after the completion of the callback

{{onStartSource)) has a barrier before the invocation of the callback

{{onRollback}} has a barrier after the completion of the callback

Here is a sample distribution of callbacks across a group of 3 consumers, a single consumer and a group of 2 consumers registered for the same source(s). We have illustrated only a single event window with a single source and 4 data events.

!http://viewvc.corp.linkedin.com/viewvc/netrepo/databus2/trunk/doc/user_docs/DatabusExecModel.png?view=co|width=50%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%!

h1. Parallel Consumption