-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathintroduction_streamProcessing.tex
426 lines (381 loc) · 22.2 KB
/
introduction_streamProcessing.tex
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
\chapter{Introduction to Event Streaming}
\label{chap:event-streaming}
In previous chapter we defined what a message broker is, one can now imagine
what systems like Apache Kafka basically intend. Because Apache Kafka is built
for big data environments, concepts like events and streams become also important. To
understand the possible use cases of Apache Kafka, the terms around event
streaming need to be clarified.
\begin{figure}[H]
\centering
\includegraphics[width=0.4\textwidth]{images/evenstreaming-intro.png}
\caption{Event Streaming terms}
\label{fig:evenstreaming-intro}
\end{figure}
\section{Purpose}
The view of data as rows of databases or single files changes when one thinks
about what a business actually does with the data generated in large distributed
systems. Where retail generates orders that lead to sales, shipments and so on,
a financial institution will generate orders that are going to have an impact on
a stock price. Or a social network platform generates clicks, impressions and
searches that are used to make some sort of intelligent analysis to further
display personalized data to it's users. Such kinds of data can be thought of as
streams of events. In fact, collecting all events a business ever generated will
lead to the current state of the business and thus describe what the business
did in the past. For example the current price of a stock was generated by all
the orders ever made on this stock. Every order can be captured as an event and
so can all events together can reproduce the current stock price.
\section{What is an Event?}
\label{intro-datastream-datastream}
Very basically an event occurs when "something happens" in a system like when a
user of an online shop adds an item to his basket. In modern systems, events are
transmitted as discrete messages on a MOM (\ref{intro-messaging-mom}) and
thus following Tannenbaum et al. (2006) represent a data unit of a data
streams. Where a data stream can be applied to discrete as well as continuous
media, events are transmitted as discrete messages only. The message itself
can be considered as an event message. \cite{EIP03}
If we think more traditionally, even database systems can be thought of as
event based systems. The process of creating a backup in form of dumps will not
scale as we increase the frequency of dumps over time. Not only will the process
take longer depending on the size of the database, also system resources are
limited during this process. An approach to make this more efficient
is change capture, which describes the difference between the state of the
affected database rows before and after the change as an event. If this can be done
continuously, a sequence of row changes is what is being left. This in fact,
can be described as a stream of events.
Systems which interact with event basically have the following key objectives:
\begin{itemize}
\item {\bf Data Analytics}: \hfill \\ Preparing collected data for further analysis at any time.
\item {\bf Data Integration}: \hfill \\ Making all the generated data of an
organization available in all its services and systems.
\end{itemize}
\section{Processing of Events (Data Analytics)}
So data streams consisting of events by themselves are not valuable but
can be taken advantage of by a system that processes these events, produces a
result and provides it to other services. This can be the calculation of the new
stock price after a customer sold his stock or the personalized content on a
news feed after a person subscribed to a new fan page. But it could also be a
more complex analysis over all the collected events that ever happened, stored in
a big database.
\\ \\
In fact, the above mentioned examples differ in their nature. Where the
calculation of the stock price is fairly simple by setting the price to the
latest paid without any knowledge about the stock prices in past.
A complete analysis of a huge data base will not only require a
significant amount of processing time, it also requires some data produced in the
past. This leads to two different approaches to handle an incoming event stream
of any size:
\begin{description}
\item[Store raw data] \hfill \\
{Simply storing every single event in a big data store. Through appending
every incoming event one gets a detailed history of every activity on the system.
To analyze the data, a periodic batch process can execute big queries over
all events to get a result. $ \Rightarrow $ \textbf{Batch Processing}}
\item[Store aggregated data ] \hfill \\
{Instead of persisting every single event, directly process the incoming data stream and store
only an aggregated summary. Due to the updating of the aggregation with every
incoming event, getting an overall result is very fast (which we call
"Real-Time"). $ \Rightarrow $ \textbf{Stream Processing}}
\end{description}
\cite{TalkKleppmann}
\subsection{Batch Processing}
\label{intro-datastream-batchprocessing}
Traditional batch processing systems nowadays are distinguished between
map-reduce-based and non-map-reduce-based systems.
The process of data integration (a.k.a data extraction-transformation-load, ETL),
runs at a regular time interval, such as daily, weekly or
monthly. Analyzing data that resides in a data store stage becomes
challenging when data size grows and systems may not be able to process results
within a reasonable time limit. \cite{Liu:2014:SRP:2628194.2628251}
%\subsection{Real-time Batch Processing}
As the trend shows, the needs of performance and responsiveness in a big data
environment can't be fulfilled with traditional batch processing anymore.
Instead, real-time processing becomes more important than ever to achieve
results from queries in minutes, even seconds.
\cite{bange2013big}
In real-time batch processing fashion, systems will address the data integration stage
with continuous input of data. Processing in near-real-time to present
results within seconds is being addressed in data analytics. Thus,
real-time batch processing gives organization the ability to take immediate action
for those times when acting within seconds or minutes is significant.
\cite{PrpSvyOfDSPS}
\subsection{Stream Processing}
\label{intro-datastream-streamprocessing}
Stream processing refers to integration and processing of data before storing.
A stream processing system is built out of multiple units called processing
elements (PE). Each PE receives input from its input queue, does some
computation on the input using its local state and produces output to its
output queue. PEs communicate always through messaging with other PEs.
\\ \\
Most importantly, those systems are optimized for low latency and high
availability. Recovering from failures is critical for stream processing
systems and should be fast and efficient.
Data should be partitioned and handled in parallel for large volumes of data.
The partitioning strategy of a system affects how the system
handles the data in parallel and how the system can scale.
\cite{PrpSvyOfDSPS}
\\ \\
Stream processing frameworks -- such as Storm, Samza, or Spark
Streaming -- were specially developed to provide rich processing primitives and
thus can be taken advantage of in the data integration- and processing stages.
\subsection{Lambda Architecture}
While batch processing is used for data analysis to get results out of huge
amounts of stored raw data at any time, stream processing reacts to events in
real time. Both approaches are very useful in different use cases. Lambda
architecture is a data-processing architecture designed to handle massive
quantities of data by taking advantage of both batch and stream processing
methods. It is split into three layers, the batch layer, the serving layer and
the speed layer:
\begin{figure}[H]
\centering
\includegraphics[width=0.8\textwidth]{images/lambda-architecture.jpg}
\caption{Lambda Architecture \cite{lambda-architecture}}
\label{fig:lambda-Architecture}
\end{figure}
Any query is answered through the serving layer by querying both the speed and
the batch layer. Where the batch layer periodically computes views on the
current collected data and is being outdated at the end of its computation, the
speed layer closes this gap by constantly processing the most recent data in
near real-time fashion. \cite{marz2015big} \cite{PrpSvyOfDSPS}
\subsection{Requirements to Event Processing Systems}
Any system which is dependent on a continuous input of event data, requires a
delivery system that can provide data constantly as a stream. Stream
processing systems, whether being in a lambda architecture or not, obviously
fulfill this requirement. On the other hand, in a big data environment there is
also the requirement of batch processing systems
(\ref{intro-datastream-batchprocessing}) being served with continuous data. In a
lambda architecture this could be done using a stream processing framework
(\ref{intro-datastream-streamprocessing}) responsible for serving the batch
processing system with integrated data, ready for data analysis. However, an
other way of doing so would be a data store -- such as the hadoop file system
(HDFS) -- where data can be directly taken for further analysis.
The same requirement of a data store holds for any
other business intelligence system followed by the problem of depending on
a data store which eventually -- due to the lack of an adapter -- can not be
served with data by a stream processing system as comfortable as the HDFS.
Facing the two types of processing events -- stream- and batch
processing -- together, data integration results as a common stage both systems
have to deal with. Even further, for every system an organization operates, like
the data warehouse, full-text search indexes, caches or more, the data
integration stage will be present. Additionally, stream processing systems
require a continuous incoming stream to further integrate and process where
batch processing systems on the other hand demand a given persistent set of data
to further integrate and analyze.
Further more, in terms of stream processing the requirement of low latency is
essential for any system of this type. For database systems combined with stream
processing, reliability becomes significantly important to handle critical
updates such as replicating as discussed above. In terms of batch processing
however, the demand on low latency is not as important as the availability of
well integrated data with a high throughput to be able to handle a large volume
of data in time range as low as possible.
\begin{table}[H]
\centering
\begin{tabular}{l|c|cl}
\multicolumn{1}{c|}{\textbf{}} & \textbf{Stream Processing} & \textbf{Batch
Processing} & \multicolumn{1}{c}{\textbf{}} \\ \cline{1-3}
Data Integration & x & x
& \\
Continuous Data & x &
& \\
Persistent Data & & x
& \\
Low Latency & x &
& \\
High Throughput & & x
&
\end{tabular}
\caption{Requirements of Batch and Stream Processing Systems}
\label{table:requirements-batch-stream}
\end{table}
\section{Event Stream (Data Integration)}
The need of a constant input of event data is discussed as general requirement
for event processing (see table \ref{table:requirements-batch-stream}). A
technology or system which provides this constant flow of events from any
system to event processing, is part of the data integration and is called event
stream in a highly abstract manner.
\subsection{Mesh of Streams}
Note that the mentioned event processing systems do not only consume data for
integration and processing, they can also have an output. Thus, many systems are
both sources and destinations for data transfer in consequence of which a system
would then need two channels per system. Obviously, each output in
this constellation is intended to be consumed by 1--N system(s) again.
Connecting all of these would lead to building a custom channel between each
pair of system (see Figure \ref{fig:datapipeline_complex}). As the set of systems
in an organization grows, this would clearly become a challenge to maintain.
\begin{figure}[H]
\centering
\includegraphics[width=0.6\textwidth]{images/datapipeline_complex.png}
\caption{Complex Point-To-Point Architecture}
\label{fig:datapipeline_complex}
\end{figure}
In this scenario several hurdles arise. The process of data integration would
have to be done for each system pair individually. At this point, an obvious
approach for a simplification would be to introduce an organization-wide
standardization of the data format. Thus, the integration process becomes
significantly easier but still has to be done redundantly. Another problem that
remains are the tightly coupled systems. A simple change on one system could
affect one or more of its connected systems directly, which is not only hard to
manage but also reduces the flexibility of further development of the landscape.
To extend the landscape the chances are high to touch existing systems which is
not only time intensive but also connected with risks regarding possible
failures. In fact, these are known issues of a Point-To-Point channel of
traditional messaging and are described in Section
\ref{intro-messaging-pointtopoint}.
\subsection{Central Platform as Solution}
\label{intro-datastream-centralplatform}
A more elegant and reliable approach to solve the mentioned problems would be to
introduce a central platform, that is able to support both batch and real-time
consumption and thus is able to fulfill the described requirements given in table
\ref{table:requirements-batch-stream}. This system can act as a single
data repository to isolated consumers and gives access to any data that is
required, as shown in figure \ref{fig:datapipeline_simple}. It aggregates
incoming events and represents an even stream for any its consumers.
\begin{figure}[H]
\centering
\includegraphics[width=0.6\textwidth]{images/datapipeline_simple.png}
\caption{Architecture with a centralized event stream platform}
\label{fig:datapipeline_simple}
\end{figure}
In this constellation integrating a new data system -- be it a data
source or a data destination -- is fairly simple. Connecting it to a single
channel attached to the central platform is all that needs to be done. Besides the
loosely coupled components, with this architecture it becomes possible to
centralize the process of data integration by doing so in a standardized way,
directly within the stream. Thus, a huge factor of complexity over the system
landscape is being reduced. In fact, this opens up a whole new set of
possibilities in organizational scalability.
\newpage
\section{Link to Message Brokers}
As solution for a modern big data environment with a huge amount of event data
which needs to be processed in different systems we need a platform which acts as
a mediator to handle the incoming events as streams and provide them to several
consumers (data integration). This seems to be very similar to
the definition of a message broker (\ref{intro-messaging-broker}), and indeed
these two concepts can be compared. Actually, an event stream can be realised
with an underlying message broker. Events can be considered as messages with the
event data as payload. The central stream platform is nothing else than a
message broker which handles incoming events, persists them in an ordered queue
and provides them for the consumers which can be stream or batch processing
systems.
\begin{figure}[H]
\centering
\includegraphics[width=0.8\textwidth]{images/messaging-vs-streaming.png}
\caption{Event Stream vs Messaging}
\label{fig:messaging-vs-streaming}
\end{figure}
The question if a message broker implementation is predestined for a big data
event stream environments is depending on its guarantees and performance. In
comparison to a standard integration platform where a few applications communicate with each
other, the main challenge for a message broker in a big data environment, is the
very large amount of data which needs to handled. The main demands are high
throughput and reliability in case of failures.
\subsection{Example Implementations}
The following implementations give an overview of existing message broker systems.
In chapter \ref{intro-kafka} we will then examine the capabilities of Apache
Kafka related to the functionalities of acting as a centralized event stream
platform.
\begin{description}
\item [Rabbit MQ] \hfill \\
{
RabbitMQ is written in Erlang and its source code is released under the
Mozilla Public License. It supports many messaging protocols, among
which the most important are STOMP: Streaming Text Oriented Messaging
Protocol and AMQP: Advanced Messaging Queuing Protocol. Incoming messages
are placed in a queue whereas this message can further be stored in memory
or on a disk. The latter model of persistency will under-perform when there is
a large number of queues which need to access the disk simultaneously.
\cite{rabbitmq}
In a RabbitMQ Cluster, queues are created and live in a single node, and all
nodes know about all the queues. When a node receives a request to a queue
that is not available in the current node, it routes the request to the node
that has the queue. To provide high availability (HA), RabbitMQ replicates
messages between master and a mirrored slave, so the slave can take over if the
master has died. \cite{wickramarachchi2012andes}
What RabbitMQ clustering does not providing guarantees against message loss.
When a Rabbit cluster node dies, the messages in queues on that
node can disappear. This is because RabbitMQ doesn't replicate the contents
of queues throughout the cluster. They live only on the node that owns the
queue. \cite{videla2012rabbitmq}
}
\item [Active MQ] \hfill \\
{
Apache ActiveMQ is an open source message broker written in Java
together with a full Java Message Service (JMS) client.
The messages are stored in a data log (called journal), which consists of
individual files. When all messages in a data log have been successfully
consumed, the data log file is marked as being ready to be deleted - or
archived in a periodical cleanup cycle. Then at intervals the journal can be
copied to a long term persistence storage (e.g. JDBC). Multiple
brokers nodes cannot share a journal. Each must be configured with it's own
journal. The journal cannot be used, however, to recover messages as it does
not keep an ordered index. So the long term store is used to recover the
durable subscription. Additionally a separate in-memory reference store
holds references from those messages residing in the data log to improve
performance.
In a clustered long term persistence setup, Apache Zookeeper
is used to pick a master from a set of broker nodes configured to replicate
a LevelDB Store. Then synchronizes all slave LevelDB Stores with the master
keeps them up to date by replicating all updates from the master.
ActiveMQ will preserve the order of messages sent by a single producer to
all consumers on a topic. If there is a single consumer on a queue then the
order of messages sent by a single producer will be preserved as well.
However due to multi-threading and asynchronous processing, the messages
from different producers could arrive in different consumers in different
orders.
\cite{activemq}
}
\end{description}
%\subsection{Streaming Broker}
\begin{description}
\item [Amazon SQS] \hfill \\
{
Amazon Simple Queuing Service is a cloud service which offers a simple
message queue to move data between distributed systems. SQS guarantees
reliability by redelivering Messages in case of failure. If a message is
successfully processed by a consumer it will be deleted out of the queue.
Amazon SQS does not guarantee a strict ordering of messages. All messages
are stored redundantly on multiple servers and in multiple data center.
\cite{amazonSQS} \cite{amazonSQSFaq}
SQS as itself can not provide a publish subscribe behaviour as it is
available in typical message brokers. But in combination with the Amazon
Simple Notification Service (SNS) it is possible to create topics which are
linked to SQS queues where consumers can register itself and only receive
message from a specific topic. \cite{amazonSqsPubSub}
}
\item [Amazon Kinesis] \hfill \\
{
Amazon Kinesis is a cloud service for streaming event
data for further processing which is very similar to Apache Kafka. Producer
applications push data continuously to the Kinesis broker where the messages
are kept for 24 hours in memory. Kinesis provides ordering of records, as
well as the ability to read and/or replay records in the same order to
multiple Amazon Kinesis Applications.
\cite{amazonKinesis} \cite{amazonKinesisFAQ}
\begin{figure}[H]
\centering
\includegraphics[width=0.7\textwidth]{images/amazon-kinesis.png}
\caption{Amazon Kinesis aggregates events and interacts with other
Amazon Cloud services \cite{amazonKinesis}}
\label{fig:amazon-kinesis}
\end{figure}
}
%\item [Scribe] \hfill \\
%{ Push based, no pub-sub, static configuration}
%\item [Kastrell] \hfill \\
%{no clustering; no pub-sub}
\item [Apache Flume] \hfill \\
{Flume is a distributed service specialized on being a reliable way of
getting event data into HDFS. The typical deployment consists
of a number of logical nodes, arranged into three tiers. The first tier
is the agent tier. Agent nodes are typically installed on the machines
that generate the data and are the initial point of contact with Flume.
They forward data to the next tier of collector nodes, which aggregate the
separate data flows and forward them to the final storage tier. Flume is
push based and does not support public-subscribe semantics. \cite{apacheflumeDoc}
\begin{figure}[H]
\centering
\includegraphics[width=0.4\textwidth]{images/flume-architecture.png}
\caption{Apache Flume general architecture \cite{apacheflumeDoc}}
\label{fig:flume-architecture}
\end{figure}
}
\end{description}