-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathimplementation_broker.tex
1244 lines (1044 loc) · 55.9 KB
/
implementation_broker.tex
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
\chapter{Implementation Broker}
\label{chap:broker}
The broker implementation is a stand-alone server application based on the
Apache Kafka protocol implementation (see chapter \ref{sec-protocol}). To clarify the
program flow from receiving data to the network, to handling the requests, and
to accessing the file system, the broker is split into three layers, each with its
own module.
\begin{description}
\item [Network Layer] \hfill \\
This module encapsulates action on the network. It initiates
connections and receives bytes from client. After the received bytes are
chunked into single requests and provide it to the API Layer.
\item [API Layer] \hfill \\
This module handles incoming requests. It first parses the
received bytes and then proceeds to an appropriate action,
depending on the delivered API key. After performing the action, an
appropriate response is generated and provided to the Network Layer
to send back to the client.
\item [Log Layer] \hfill \\
This module encapsulates actions to the log on the filesystem.
Fundamental functions are appending messages to the log or reading from
it.
\end{description}
\begin{figure}[H]
\centering
\includegraphics[width=0.45\textwidth]{images/impl-brok-layers.png}
\caption{Three layer of broker server application}
\label{fig:impl-brok-layers}
\end{figure}
\section{Server Architecture}
\label{sec:impl-broker-threading}
The server architecture concept of the broker application includes the
following components working as own thread instances:
\begin{description}
\item [Main Thread (single instance)] \hfill \\
The main function of the broker bootstraps the whole server application. It first
initializes the network layer and then forks the \textit{Acceptor}, \textit{Responder} and \textit{API Handler}
threads. Started threads are managed with the \lstinline{withAsync} function of
the \fnurl{Control.Concurrent.Async}
{https://hackage.haskell.org/package/async-2.0.0.0/docs/Control-Concurrent-Async.html}
library. It adds a thin layer over the basic concurrency operations provided
by \fnurl{Control.Concurrent}
{https://hackage.haskell.org/package/base-4.5.0.0/docs/Control-Concurrent.html}
and gives the ability to wait for the return value of a thread. This
provides additional safety and control if a thread crashes.
\item[Acceptor Thread (single instance), Network Layer] \hfill \\
This thread accepts new client connections (see
\ref{sec:impl-broker-socket-connection}) in a running loop. To support
multiple connections from different clients, it forks a new thread for
processing incoming data whenever a new connection is established.
\item[Connection Processor Thread (instance per connection), Network Layer] \hfill \\
The incoming data needs to be processed as fast as possible. If the
limit of the socket buffer is reached, the throughput on the network drops
dramatically. The processor thread constantly receives requests from a
given connection (see \ref{sec:impl-broker-socket-receive}) and provides it
to the API handler thread. If the connection is closed, the thread will
return.
\item[Responder Thread (single instance), Network Layer] \hfill \\
This thread works off the provided responses and sends them back to the original
client (see \ref{sec:impl-broker-socket-send}). As for now, there is only
one instance of this thread. Further one instance per connection should be
forked. The challenge then would be to stop a responder threads properly after
a connection to a client is closed.
\item [API Handler Thread (one instances)] \hfill \\
The API handler is a worker thread which works off the received request. It
also builds appropriate responses and provides it to the responder thread
(see \ref{sec:impl-broker-api-handle}). It is conceivable to run more than
a single instance of the API handler. Due to advanced synchronisation, this
is not realized yet.
\end{description}
Original Apache Kafka has a fix and configurable amount of threads handling
network requests by working with thread pools. Thanks to the optimizations of
the GHC IO manager, the communication with the Haskell threads and the OS is
very efficient due to thread multiplexing. Thread pooling has no further
advantages than one socket processor thread instance per connection.
For transfer of data between the layers, channels are implemented. The
\fnurl{Control.Concurrent.Chan}
{http://hackage.haskell.org/package/base-4.8.0.0/docs/Control-Concurrent-Chan.html}
package provides a one-way FIFO communication channel. This concept is used to
separate the three layers from each other. The fact that \textit{chan} is
unbounded brings a risk. While \textit{writeChan}--which is being used to write
to a channel--succeeds immediately, there is a chance that the consuming thread
is not able to read the same amount of data in a given time and thus the channel
will grow in an unchecked manner. \cite{o2008real}
To isolate the network layer from any build or parse process, the transfered
data is of type \lstinline{Data.ByteString}. Because the response to a
particular request needs to be sent back over the same connection, the data
passed to a channel is defined as tuple of the received bytes and the
corresponding socket connection information:
\begin{lstlisting}[caption={Initialize channels for threading}]
import qualified Data.ByteString.Lazy as B
type ChanMessage = ((Socket, SockAddr), B.ByteString)
type RequestChan = Chan ChanMessage
type ResponseChan = Chan ChanMessage
\end{lstlisting}
\begin{figure}[H]
\centering
\includegraphics[width=1\textwidth]{images/impl-brok-threading.png}
\caption{Internals of broker server application}
\label{fig:impl-brok-threading}
\end{figure}
\newpage
\section{Network Layer}
\label{sec:broker-network}
The broker application has a thin network layer which is responsible for
receiving and sending binary data to clients. Instead of using HTTP,
connection-oriented sockets (TCP) are used. Therefore, the broker is independent
from any HTTP implementation, and clients can make use of advanced TCP features
like the ability to simultaneously poll many connections or multiplex requests.
While a socket is an endpoint of a bidirectional inter-process communication
flow, each connection established to the broker is basically a socket
connection. That being said, it is important to provide a reliable socket server
implementation which will serve correctly under a heavy load. Haskell provides
full control over sockets using the
\fnurl{Network.Socket}{https://hackage.haskell.org/package/network-2.3.0.7/docs/Network-Socket.html}
library which exposes the \fnurl{C socket
API}{http://pubs.opengroup.org/onlinepubs/7908799/xns/syssocket.h.html}.
This section shows the basic operations and describes the used libraries in the
network layer. For further details about the threading concept, see section
\ref{sec:impl-broker-threading}.
\subsection{Socket Connection Establishment}
\label{sec:impl-broker-socket-connection}
\subsubsection{Initialize Socket}
First of all, the initialization process of a socket happens on the server by providing the following configuration parameters:
\begin{itemize}
\item {\bf Protocol Family:} \textit{AF\_INET}, for network protocol IPv4.
\item {\bf Socket Type:} \textit{Stream}, which provides sequenced, reliable, two-way, connection-based byte streams.
\item {\bf Protocol Number:} \textit{0}, which indicates that the caller does not want to specify the protocol, leaving it up to the service provider.
\end{itemize}
\subsubsection{Bind Socket}
After the configuration is set, the socket has to be associated with an address
structure which is a constellation of an IP Address and a Port. The constructor
\textit{SockAddrInet} of data type \textit{SockAddr} takes the following
two arguments:
\begin{itemize}
\item {\bf Port Number} 4343 (As for now static, but could be done with configuration)
\item {\bf Host Address} iNADDR\_ANY, which binds the socket to all interfaces.
\end{itemize}
\subsubsection{Listen}
While the socket is created and bound to an interface, the socket state can be
entered into a listening state. The only configuration parameter which has to be
considered is the maximum number of queued connections requesting to
be accepted (also called backlog). While this parameter is not critical in the
constellation of this broker, we set the queue length to \textbf{50} which is
also the default value in the Java SocketServer implementation. \textit{Note
that the focus remains on the amount of data being processed rather than the
number of clients being served.}
\subsection{Receive Data}
\label{sec:impl-broker-socket-receive}
To receive data from the socket buffer, the function \lstinline{recv} from\\
\fnurl{Network.Socket.ByteString.Lazy}
{https://hackage.haskell.org/package/network-2.3.0.1/docs/Network-Socket-ByteString-Lazy.html}
library which provides access to the Unix socket interface is used. Because the underlying
protocol is in binary format, and the incoming data later need to be parsed
directly from type \lstinline{ByteString} this module is more efficient than
the string based network functions. We took the lazy variant of the library
because the input gets directly parsed by our lazy based encoder.
Because each incoming request needs to be handled individually, the exact
amount of bytes to get one particular request from the socket buffer first need
to be determined. Therefore the first four bytes---which defined the request
size according to---the protocol, need to be received and parsed to an numeral
value.
\begin{lstlisting}[caption={Receiving a request from socket}]
import qualified Network.Socket.ByteString.Lazy as S
import qualified Data.ByteString.Lazy as BL
recvFromSock :: (Socket, SockAddr) -> IO BL.ByteString
recvFromSock (sock, sockaddr) = do
respLen <- S.recv sock (4 :: Int64
let parsedLen = getLength respLen
req <- recvExactly sock parsedLength
where
getLength = runGet $ fromIntegral <$> getWord32be
\end{lstlisting}
After getting the size of a particular request, the evaluated amount of bytes
is received from the socket. Because of the blocking semantics of Unix sockets
and TCP packet fragmentation, it is not guaranteed that the \lstinline{len} argument for the
\lstinline{recv} system call gets exactly the number of bytes requested. The call
may produce less data than specified. Therefore the following
function is implemented to get the data in chunks until the entire request is
read.
\begin{lstlisting}[caption={Receive exactly amount of bytes from socket}]
recvExactly :: Socket -> Int64 -> IO BL.ByteString
recvExactly sock size = BL.concat . reverse <$> loop [] 0
where
loop chunks bytesRead
| bytesRead >= size = return chunks
| otherwise = do
chunk <- S.recv sock (size - bytesRead)
if BL.null chunk
then return chunks
else loop (chunk:chunks) $! bytesRead + BL.length chunk
\end{lstlisting}
\newpage
\subsection{Send Data}
\label{sec:impl-broker-socket-send}
For every processed request, an appropriate response is sent back to the
client. The network layer does not need to generate responses, actually they
are provided as binary data from the API layer. The network layer
simply needs to send the bytes over the existing connection back
to the client. For sending data over the socket, function \lstinline{sendAll} from
\fnurl{Network.Socket.ByteString.Lazy}
{https://hackage.haskell.org/package/network-2.3.0.1/docs/Network-Socket-ByteString-Lazy.html}
is used. This function continues to send data until either all data has been
sent or an error occurs. For further details about Error Handling, see section
\ref{sec:broker-error-handling}.
\begin{lstlisting}[caption={Send response back to client via socket}]
import qualified Data.ByteString.Lazy as B
import qualified Network.Socket.ByteString.Lazy as S
sendResponse :: (Socket, SockAddr) -> B.ByteString -> IO (Either IOError ())
sendResponse (socket, addr) responsemessage =
try(S.sendAll socket responsemessage) :: IO (Either IOError ())
\end{lstlisting}
\section{API Layer}
\label{sec:broker-api}
This part of the broker handles incoming
requests, initiates any actions needed, and prepares appropriate responses. It
fulfills the essential part in error handling which is described in section
\ref{sec:broker-error-handling}.
\subsection{Decode Request}
\label{sec:impl-broker-api-handle}
After getting the next bytestring from network layer, the incoming request first
gets decoded. For this part, the API layer simply uses functions provided by the
protocol implementation (see chapter \ref{sec-protocol}).
\subsection{Handling Requests}
The main focus of the API layer lies on handling the requests properly.
Depending on the API key field of type \lstinline{RequestMessage}, the request
handler determines an appropriate action. In case of a valid API key, an action
is performed and a encoded response message is passed back to the Network Layer.
For nearly all provided APIs, the broker needs access to the functions provided
by the log subsystem as described in section \ref{sec:impl-broker-log}.
Therefore, the API layer initializes the LogManager and passes it to the
handling function below:
\newpage
\begin{lstlisting}[caption={Handling requests depending on ApiKey}]
handleRequest :: RequestMessage -> LogManager.State
-> IO (Either HandleError BL.ByteString)
handleRequest rm = do
handle <- case rqApiKey rm of
0 -> handleProduceRequest rm
1 -> handleFetchRequest rm
2 -> handleOffsetRequest rm
3 -> handleMetadataRequest rm
8 -> handleOffsetCommitRequest rm
9 -> handleOffsetFetchRequest rm
10 -> handleConsumerMetadataRequest rm
_ -> return (Left UnknownRqError)
return handle
\end{lstlisting}
\subsubsection{HandleProduceRequest}
\label{subsec:broker-api-producerequest}
Handling a produce request implies that the API layer passes the containing
messages to the append function provided by the \lstinline{LogManager}. A
request may contain messages for multiple topics or partitions, and each
combination refers to another log file. Therefore, the job of the API layer is
to unnest the request into single calls to the \lstinline{LogManager}.
\begin{verbatim}
Simplified structure of nested sequences in a produce request:
[Topic [Partition [MessageSet]]]
Example:
[TopicA [Part0 [M], Part1 [M]], Topic B [Part0 [M]]]
Unnested to three tuples which can be feed to LogManager as a single call:
[(TopicA, Part0, [M]), (TopicA, Part1, [M]), (TopicB, Part0, [M])]
\end{verbatim}
Thanks to Haskell's \textit{list comprehension}, the unnesting of the request
can be made quite elegant:
\begin{lstlisting}[caption={List comprehension for unnesting produce request}]
[ (rqToName x, rqPrPartitionNumber y, rqPrMessageSet y ) | x <- rqPrTopics req, y <- rqToPartitions x ]
\end{lstlisting}
\subsubsection{HandleFetchRequest}
\label{subsec:broker-api-fetchrequest}
Analog to the produce request, handling a fetch request leads to an according
\lstinline{readLog} call on the \lstinline{LogManager}. The retrieved messages
are packed and encoded to a protocol conform fetch response.
Fetching data for multiple topics or partitions is not yet supported.
\subsubsection{HandleMetadataRequest}
Handling a request to the metadata API includes gathering information on the
broker system and provided topics and packing them to a metadata response.
Getting replication and partition related information is not yet implemented.
For retrieving a list of provided topics, the \lstinline{LogManager} is again
involved. The exposed function \lstinline{getTopicNames} analyses the existing
file structure and gives the available topics.
Information regarding the broker host includes local IP. Therefore, the library
\fnurl{Network.Info}{https://hackage.haskell.org/package/network-info} is used:
\begin{lstlisting}[caption={Example of getting the brokers host adress}]
import qualified Network.Info as N
getHost :: IO N.NetworkInterface
getHost = do
ns <- N.getNetworkInterfaces
return $ head $
filter ((N.NetworkInterface name ipv4 ipv6 mac) -> name == "eth0") ns
\end{lstlisting}
\newpage
\section{Error Handling}
\label{sec:broker-error-handling}
A message broker relies fundamentally on error handling, as fault tolerance and
reliability are key requirements of such a system. The first part of this
section concentrates on the architectural details and decisions made regarding
error handling. Using a demonstration of a message flow, possible edge cases are
considered and uncovered in order to provide reliability to the user of this
broker. The second part covers the techniques used in Haskell to handle the
previously described concept and cases where errors may occur. It will also
involve part of the concept behind the error handling of Apache Kafka, which is
partly integrated in the Apache Kafka Protocol and thus adapted in this
implementation.
\subsection{Message Flow}
The following figure (\ref{fig:broker-error-activity}) provides insight into
the process that an incoming request message goes through. For simplicity, one
may think of a message proceeding through all steps successively. But in fact,
multiple requests are handled at the same time. Assuming the connection to the
broker is established and the client is ready to send requests to the broker,
there are principally two types of errors---SocketError and RequestError---that
may occur for each, which there is a separate handler to handle errors
appropriately.
\begin{figure}[H]
\centering
\resizebox {0.7\linewidth} {!} {
\input{broker_activiy.tex}
}
\caption{Broker error handling concept}
\label{fig:broker-error-activity}
\end{figure}
\newpage
\subsubsection{Socket Error}
During the process of listening to an open connection and reading an incoming
byte-stream, as described in \ref{sec:broker-network}, there is a chance of
unexpected errors occuring. In this case, the underlying C socket implementation
will return a result of \textit{-1} which will then result in
an \fnurl{IOError}{https://hackage.haskell.org/package/base-4.4.1.0/docs/System-IO-Error.html}.
Errors at this stage will be handled using the \textit{Socket Error Handler} and
do not result in any response to the client who initiated the request that
resulted in an error, as the socket connection may be broken. Instead, the error
is simply caught and reported as console output. Alternatives would be using a
separate logging framework such as
\fnurl{hslogger}{https://hackage.haskell.org/package/hslogger}. At worst, the
socket connection is closed.
\subsubsection{Request Error}
One step further in the API-Stage (\ref{sec:broker-api}), past where socket
errors may occur, the heavy part of error handling begins. Each step the
request passes may result in an error. To name a few, this can be the case
starting by parsing the request, writing data to the disk executed by the log
subsystem, or at last producing the response message. All of the mentioned cases
and others who fall into the related category will be handled by the
\textit{RequestErrorHandler}, where a set of possible errors or category of
error is defined, resulting in an appropriate response message sent to the
client.
\subsection{Defining Error Types}
The previously introduced concept of the two error handlers, each responsible
for a certain layer of the application, can be built in Haskell conveniently by
using the \fnurl{Either
Monad}{https://hackage.haskell.org/package/category-extras-0.52.0/docs/Control-Monad-Either.html}
and \textit{pattern matching} of custom defined error types. This will allow taking
actions based on the given error type.
Error types related to the networking layer are simply distinguished between an
error occurring either in the receiving or responding process.
\begin{lstlisting}[caption={SocketError types}]
data SocketError =
SocketRecvError String
| SocketSendError String
deriving Show
\end{lstlisting}
Any further details are not separated by sub-types. Instead, they can be
described in the first argument of the \lstinline{SocketError}, namely a
\lstinline{String}.
To handle errors during the process of request handling, the data type
\lstinline{HandleError} contains specific types for each edge case in any part
of the application. Those types can be considered as an interface for errors
between the request handler and the underlying subsystems. This types are only
defined for the API layer where the handling of requests occurs.
\begin{lstlisting}[caption={Handle Error types}]
data HandleError =
PrWriteLogError Int String
| PrPackError String
| ParseRequestError String
| FtReadLogError Int String
| SendResponseError String
| UnknownRqError
deriving Show
\end{lstlisting}
\subsection{Error Handlers}
With the knowledge about the types they are related to, either socket or request
errors, it is now possible to create handler functions. They act as a central
place where any kind of error can be sent and will provide the functionality to
appropriately handle. Figure \ref{fig:broker-error-activity-detail} illustrates
that the two mentioned error handlers distinguish their next action based on the
data type containing its arguments that is sent to the error handler.
\begin{figure}[H]
\centering
\resizebox {0.7\linewidth} {!} {
\input{broker_activiy_extended.tex}
}
\caption{Broker error handling concept in detail}
\label{fig:broker-error-activity-detail}
\end{figure}
Assume during the process of an incoming \lstinline{ProduceRequest}, the log
subsystem fails to write data to the disk and returns a log specific error. This
error will not be handled directly. Instead, it is caught by the API layer
which is now in charge to map this error to one of the defined
\lstinline{HandleError}. In this specific example, the request handler of the
API layer takes the error response from the log system, maps it to a
\lstinline{String} (if it is not already mapped) as well as the offset of the
failing message, and assigns this information to the type
\lstinline{PrWriteLogError} as the first and second argument. \\
The following part of this section will describe the functionalites of the
\textit{Socket Error Handler} as well as the \textit{Request Error Handler}
provided by the functions \lstinline{handleSocketError} and
\lstinline{handleHandlerError}.
\subsubsection{Socket Error Handler}
The scope of handling a socket related error is very limited, due to the fact
that the connection may be broken or the client will not receive any more data
using the current connection. Given this, the current implementation only
prints the occurred error on console:
\begin{lstlisting}[caption={Handling error of type SocketError}]
handleSocketError :: (Socket, SockAddr) -> SocketError -> IO()
handleSocketError (sock, sockaddr) (SocketRecvError e) = do
putStrLn $ "[Socket Receive Error] " ++ e
handleSocketError (sock, sockaddr) (SocketSendError e) = do
putStrLn $ "[Socket Send Error] " ++ e
\end{lstlisting}
While further logging of errors is out of scope within this thesis, the given
architecture may very well be able to do so. At the point after the pattern
matched--currently implements a \lstinline{putStrLn}--one
could inject an external logging service providing more meaningful information
such as proposed in \fnurl{RFC5424}{http://tools.ietf.org/html/rfc5424}.
\subsubsection{Request Error Handler}
The Apache Kafka Protocol defines error codes (see
\ref{subsec:protocol-types-error-codes}), which should be applied to a response
message if the given failed for some reason. Thus, the
\lstinline{handleHandlerError} function is responsible to provide the related
response message containing the appropriate error code. However, the current
implementation does not suppport a response message caused by an error. Instead,
it prints a notification on the broker side as shown in the code below:
\begin{lstlisting}[caption={Handling error of type HandleError}]
handleHandlerError :: (Socket, SockAddr) -> HandleError -> IO()
handleHandlerError (s, a) (ParseRequestError e) = do
putStrLn $ (show "[ParseError]: ") ++ e
handleHandlerError (s, a) (PrWriteLogError o e) = do
putStrLn $ show "[WriteLogError on offset " ++ show o ++ "]: " ++ show e
handleHandlerError (s, a) UnknownRqError = do
putStrLn $ show "[UnknownRqError]"
handleHandlerError (s, a) e = do
putStrLn $ show e
S.sendAll s $ C.pack $ show e
sClose s
putStrLn "***Host " ++ (show a) ++ "disconnected ***"
\end{lstlisting}
\newpage
\section{Log Layer Subsystem}
\label{sec:impl-broker-log}
The subsystem of the log layer is responsible for the persistence of messages
produced and consumed. This component plays a decisive role while handling a
\textit{fetchRequest} (\ref{subsec:broker-api-fetchrequest}) as well as a
\textit{produceRequest} (\ref{subsec:broker-api-producerequest}). Thus, it is
used extensively in the API layer (\ref{sec:broker-api}). In this section the
applied concepts--which are adapted from Apache Kafka
(\ref{intro-kafka-log})--will be explained before a proper introduction to the
components of the log subsystem is given. Afterwards follows a code explanation
for the most important functionalities, where information about design
decisions and potential threats are given.
\subsection{Components}
Even if the log subsystem lives within the broker environment, it is, from an
architectural point of view, fully decoupled and relies only on the protocol
library.
\begin{figure}[H]
\centering
\includegraphics[width=0.55\textwidth]{images/broker-log-overview.png}
\caption{Architecture of Log Subsystem}
\label{fig:broker-log-overview}
\end{figure}
As illustrated in figure \ref{fig:broker-log-overview}, the \textit{LogManager}
acts as an interface to any component that uses functionalities of the log,
which in this case will be the API layer. The \textit{LogManager} is then able
to use exposed functionalities of the hidden modules \textit{Log} and
\textit{Index} (displayed as dashed).
As for now, the functionalities \textit{LogManager} provides creates a new log
and appends or reads messages to/from an existing log. Further functionalities
are provided but are still managed by the \textit{Log} or \textit{Index} module
itself. Also exposed is the \lstinline{State} type which is the in-memory representation
of the available Logs, as discussed later in this section.
\begin{lstlisting}
module HMB.Internal.LogManager
( new
, append
, readLog
, State
) where
\end{lstlisting}
The modules \textit{Log} and \textit{Index} contain functionalities with direct
access on the filesystem. The separation leads back to the two provided files types
log and index (see \ref{log-broker-storage}).
There are small amounts of commonly used functions in the \textit{Log} as well
in the \textit{Index} module. The \textit{LogConfig} seems to be the right
location to place those intended to be imported with a qualified name such as
simply \lstinline{L}, standing for Log.
\subsection{Storage Structure}
\label{log-broker-storage}
The storage structure is exactly the same as the one defined for Apache Kafka (see
\ref{intro-kafka-log}). As for now, the root location of the log is not
configurable and set to the folder named \textbf{./log} within the installation
directory of the broker. Each folder represents a partition of a specific topic,
whereas the name of the folder is a combination of the topic name and the
partition number.
There are two types of files that reside within the folder specifying the
topic and partition:
\begin{itemize}
\item {\bf .log}: Containing a sequence of messages as binary data
\item{\bf .index}: Containing a sequence of index entries as binary data
\end{itemize}
Instead of holding only one storage file per topic-partition combination, the
log can be segmented into multiple files due to reasons of lookup optimization.
Apache Kafka starts segmenting a log after it reaches a configurable size
whereas the default is one gigabyte. Therefore a topic-partition specific
directory can contain multiple pairs of both file types. Each pair of
file types hold the same name which stays for the base offset and can be
considered as an unique identifier representing a 64 bit integer as a 20
character file name. As it was defined for Apache Kafka, the \textbf{base
offset} is the offset of the first log entry stored in a file. For example if
a log holds messages in where the first message has the offset, \textit{5 ::
Int64} the file name of this log will be \textit{00000000000000000005.log} and
the related index file \textit{00000000000000000005.index}.
Using this naming convention and by viewing multiple log files, one can extract
information about what range of messages reside in which log. This is a very
efficient way to perform a lookup which is needed to append new log messages or
archive old messages.
The following example shows a storage layout for topics \textit{TopicA, TopicB
and TopicC} with the partitions \textit{0} and \textit{1}. Partition 0 of topic
A and B are already segmented into two combinations of an index and log file.
\begin{verbatim}
log
|
+--TopicA_0
|--00000000000000000000.index
|--00000000000000000000.log
|--00000000000008650021.index
|--00000000000008650021.log
+--TopicA_1
|--00000000000000000000.index
|--00000000000000000000.log
+--TopicB_0
|--00000000000000000000.index
|--00000000000000000000.log
|--000000000000128655021.index
|--000000000000128655021.log
+--TopicB_1
|--00000000000000000000.index
|--00000000000000000000.log
+--TopicC_0
|--00000000000000000000.index
|--00000000000000000000.log
\end{verbatim}
\subsection{Storage Format}
\subsubsection{Log File}
The log file contains a sequence of messages, whereas the on-disk format of an
entry is part of the reason why the Apache Kafka becomes valuable. In fact, the
on-disk format is exactly the same as the format of the \lstinline{MessageSet}
(see \ref{impl-protocol-types-data}) transmitted with a
\lstinline{ProduceRequest} (\ref{subsec:protocol-types-producerequest}). Thus,
data does not have to be modified in any way and can directly be extracted and
written to the file system.
\subsubsection{Index File}
Every segment of a log has its corresponding index represented as another file with
the suffix \textit{.index}. Whereas the log file contains the actual messages
structured in a message format (see \ref{impl-protocol-types-data}) and for each
message within this file, the first 64 bits describe the incremented offset.
Looking up this file for messages with a specific offset becomes very expensive
since log files may grow in the range of gigabytes. And to be able to produce
messages, the broker actually has to do such kind of lookups to determine the
latest offset and be able to further increment incoming messages correctly.
Also the fetch requests can use the index for finding the needed messages by
given offset as fast as possible. This is why there is an index.
Basically, the index file contains simple entries with the following
structure:
\begin{verbatim}
Relative Offset (4 Bytes)
Physical Position (4 Bytes)
\end{verbatim}
The first field of an index entry, the relative offset, refers to a specific
message in the corresponding log file. On the other hand, the second field
represents the physical position of the referred message within the log file.
Instead of storing the whole eight bytes of the message offset, the index entry
only holds a relative offset to the base offset. This obviously reduces the size
of the index. In addition, there is to note that not every message within the
log is going to be indexed. By default, an index entry is created only after
\textit{4096 Bytes} of log data.
As per protocol specification, the request to fetch messages (see
\ref{subsubsec:protocol-fetchrequest}) holds the offset of the wanted message.
Given this offset, one can determine the base offset and is discussed later in
the log section (see \ref{subsubsec:broker-log-general-baseoffset}). Assuming
to have a valid base offset as well as the actual offset of the requested
message, it is not a big deal to figure out the relative offset of this message:
\begin{verbatim}
Relative Offset = Given Offset - Base Offset
\end{verbatim}
Given the relative offset, a lookup over the log can be processed. Remembering
that the index is significantly smaller than the log and relies in memory, this
lookup becomes reasonably fast. In fact, the file system will proceed a binary
search on the index resulting in time complexity of O(log \textit{n}), where
\textit{n} is the number of index entries.
A successful index lookup will then bring the physical position of the closest
message of the requested message within the log, which is valuable. While the
interval of index creation is fixed at a certain amount of bytes, a lookup of
the actual log message, given the position of the closest indexed message, will
result in time complexity of O(1).
\subsection{Types}
To make the code more readable and easier to manage, separate types
are defined. These are only used in the log subsystem and are defined within the
module \textit{LogConfig}. The following table gives an overview of these types:
\begin{table}[H]
\resizebox{\textwidth}{!}{%
\begin{tabular}{|l|l|l|}
\hline
\textbf{Type synonym} & \textbf{Parameter} & \textbf{Description} \\ \hline
TopicStr & String & Parsed topic name as String \\ \hline
PartitionNr & Int & Parsed partition number as Int \\ \hline
RelativeOffset & Data.Word32 & 4 Byte offset relative to the BaseOffset \\ \hline
FileOffset & Data.Word32 & 4 Byte physical offset of a Log file \\ \hline
OffsetPosition & (RelativeOffset, FileOffset) & Tuple of relative and physical offset \\ \hline
BaseOffset & Int & 8 Byte offset as Int \\ \hline
\end{tabular}
}
\caption{Types defined for log subsystem}
\end{table}
%\subsection{On-disk format}
%\subsubsection{Log entry}
%\subsubsection{Index entry}
%The structure of an entry within the index file describes only two fields, each of
%them 32 bit long:
%\begin{itemize}
% \item 4 Bytes: Offset, relative to the base offset (file name)
% \item 4 Bytes: Physical position, relative to the beginning of the log file
%\end{itemize}
%The relative offset of an index entry added with its base offset represents an
%actual message within the log. The second information, the physical position,
%then tells on what position the message resides within the log.
\subsection{In-memory Storage}
\label{subsec:broker-log-inmemory}
Working with disk alone--in stateless fashion so to speak--is not only very
difficult to manage (especially regarding multi- threading) but also extremely
slow when dealing with thousands of requests in a short period of time (see). A
more convenient and powerful approach needs to be introduced to handle a huge
amount of data on a running broker system. Therefore, an intermediate storage
in memory to optimize file accesses is demanded. An
efficient implementation of a key-value data structure, namely
\fnurl{Data.Map}{https://hackage.haskell.org/package/containers-0.4.0.0/docs/Data-Map.html},
seems to be suitable for both the log and index. Data.Map provides functionality
that will come after all the needs of this subsystem, a time complexity for
insertion, and searching of O(\textit{log n}) scalability. \\
With this approach of using a data structure for the log as well as the index,
it is more important than ever to lock appropriately in order to handle
concurrent data access. Therefore the
\fnurl{MVar}{https://hackage.haskell.org/package/base-4.8.0.0/docs/Control-Concurrent-MVar.html}
provides a flexible and powerful locking primitive. An MVar can be thought of as
a box which is either empty or full. The \lstinline{takeMVar} operation removes
the value from a full MVar and returns it but waits (or blocks) if the MVar is
currently empty. Symmetrically, the \lstinline{putMVar} operation puts a value
into the MVar but blocks if the MVar is already full. The MVar is a fundamental
building block that generalizes many different communication and synchronization
patterns. As for the log subsystem, the MVar is used as a \textit{container for
shared mutable state}. This is a common design pattern in Haskell when several
threads need write access to some state where the state value represents an
ordinary immutable Haskell data structure stored in an MVar---in this case a map
of logs or indices. \cite{marlow2013parallel}
\newpage
\subsubsection{Log State}
The \lstinline{LogState} represents an MVar holding a map which uses a tuple of
\lstinline{String} and \lstinline{Int}--representing the topic and partition
combination--as key. The corresponding value is the actual log which is a list
of \lstinline{MessageSet}.
\begin{lstlisting}[caption={Definition of the log state (MVar)}]
import qualified HMB.Internal.LogConfig as L
type Logs = Map.Map (L.TopicStr, L.PartitionNr) Log
newtype LogState = LogState (MVar Logs)
\end{lstlisting}
\subsubsection{Index State}
Same as for the \lstinline{LogState}, the \lstinline{IndexState} represents an
MVar containing a map of the same key. However, the value of this map holds a
list of \lstinline{OffsetPosition}, whose name is adapted form Apache Kafka.
\lstinline{OffsetPosition} represents a single index entry and therefore holds a
tuple of the \lstinline{RelativeIndex} and \lstinline{PhysicalPosition}.
\begin{lstlisting}[caption={Definition of the index state (MVar)}]
import qualified HMB.Internal.LogConfig as L
type Indices = Map.Map (L.TopicStr, L.PartitionNr) [OffsetPosition]
newtype IndexState = IndexState (MVar Indices)
\end{lstlisting}
\subsection{Determine Base Offset}
\label{subsubsec:broker-log-general-baseoffset}
This section gives an example of a common used function within the log
subsystem, by explaining the function which determines the base offset.
An incoming request--which at this point already parsed--contains the topic as
well as the partition number for the given set of messages. Thus, enough
information is provided to identify the location--in this case the path--of the
related log or index file. Because each log can be separated into multiple files,
the thing that is still missing now is either in which file the messages should
be appended (in case of a produce request) or from which file the messages
should be read from (in case of a fetch request).
\subsubsection{Getting existing base Offsets from File System}
To get a list of all existing base offsets, several filters and string transformations
need to be applied to the list of files in the topic-partition specific log directory.
First of all, a list of files has to be
built. The library
\fnurl{System.Directory}{http://hackage.haskell.org/package/directory-1.2.2.1/docs/System-Directory.html}
provides the function \lstinline{getDirectoryContents} that takes a file path as a
string and returns a list of all entires in the directory. This operation
performs I/O, and thus the return type is an IO monadic value. \textit{As
described on
\fnurl{hackage}{http://hackage.haskell.org/package/directory-1.2.2.1/docs/System-Directory.html\#v:getDirectoryContents},
there are several causes why this operation may fail but, at this point, we do
not provide proper error handling.}
The function \lstinline{offsetFromFileName} can be mapped over a filtered list of
strings, giving the list of all offsets within the directory. The
filter function basically omits the files other than the ones ending with
\textit{".log"} as well as the root directories, typically \textit{".", ".."}.
\begin{lstlisting}[caption={Determining all base offsets for given topic and partition}]
getBaseOffsets :: (TopicStr, Int) -> IO [BaseOffset]
getBaseOffsets (t, p) = do
dirs <- getDirectoryContents $ getLogFolder (t, p)
return $ map (offsetFromFileName) (filter (isLogFile) (filterRootDir dirs))
\end{lstlisting}
As hinted in the code above, a filter function is being used to get the list of
all base offsets within the directory. The function \textit{offsetFromFileName}
extracts the offset as an \textit{Int} from a \textit{String}. As an example,
\textit{"00000000000000000005.log" :: String } will be transformed to
\textit{0000000000000000005 :: Int}:
\begin{lstlisting}[caption={Get numeric value (offset) from given string (filename)}]
offsetFromFileName :: [Char] -> BaseOffset
offsetFromFileName = read . reverse . snd . splitAt 4 . reverse
\end{lstlisting}
Finally, the \lstinline{Maybe Offset} helps to distinguish between the highest offset
(\lstinline{Nothing}) or the next smaller \lstinline{BaseOffset} related to a provided
\lstinline{Just Offset}.
\begin{lstlisting}[caption={Get highest base offset existing of given topic and partition}]
getLastBaseOffset :: (TopicStr, Int) -> Maybe Offset -> IO BaseOffset
getLastBaseOffset (t, p) o = do
bos <- getBaseOffsets (t, p)
case o of
Nothing -> return (maxOffset bos)
Just o -> return (nextSmaller bos o)
\end{lstlisting}
\subsubsection{Getting right base Offset depending on Intent}
In case of producing messages, the log segment wanted is the one named after the
highest offset to be able to append new messages to the log. This is why one
does not get around reading the content of this directory. Handling the case
where no file exists leads to the creation of a new log with the offset of zero.
\begin{lstlisting}[caption={Determining highest offset of given list}]
maxOffset :: [BaseOffset] -> BaseOffset
maxOffset [] = 0
maxOffset [x] = x
maxOffset xs = maximum xs
\end{lstlisting}
In case of receiving messages, the process is almost the same. The only
difference is that the wanted file does not hold the highest offset but the one
that hold the offset which is the next smaller one regarding the provided offset
number within a fetch request. Let's assume one wants to fetch the message with
offset \textit{400}, yet there are two different pairs of log- and index
files: \textit{100} and \textit{300}. The correct one would be \textit{300} as the
message must reside within this file.
\begin{lstlisting}[caption={Determining offset which is next smaller regarding given offset}]
nextSmaller :: [BaseOffset] -> BaseOffset -> BaseOffset
nextSmaller [] _ = 0
nextSmaller [x] _ = x
nextSmaller xs x = last $ filter (<x) $ sort xs
\end{lstlisting}
\subsection{Append to a Log}
\label{subsec:broker-log-append}
Writing data of type \lstinline{MessageSet} to a log is the fundamental process behind the
scenes of handling a \lstinline{produceRequest}
(\ref{subsec:broker-api-producerequest}). Before actually writing data to the
file system, several steps come in between. These steps contribute significantly to the
concept of Apache Kafka's Log system which are adapted in this implementation.
This section describes the implementation details of the mentioned steps in the
order of occurrence.
\begin{figure}[H]
\centering
\begin{sequencediagram}
%\newthread{broker}{Broker}
\newinst[0]{manager}{Log Manager}
\newinst[4]{log}{Log}
\newinst[4]{index}{Index}
\begin{call}
{manager}{(1) Find existing log}{log}{}
\end{call}
\begin{call}
{manager}{(2) Increment offset of new messages}{manager}{}
\end{call}
\begin{call}
{manager}{(3) Concatenate new messages to log}{manager}{}
\end{call}
\begin{call}
{manager}{(4) Determine last index}{log}{}
\end{call}
\begin{call}
{manager}{(5) Create index entry if interval is reached}{index}{}
\end{call}
\begin{call}
{manager}{(6) Append log}{log}{}
\end{call}
\end{sequencediagram}
\caption{The process of appending a message to the log}
\label{fig:broker-log-append}
\end{figure}
\subsubsection{LogManager}
The \textit{LogManager} is responsible to append entries to a log, therefore exposing
the function \lstinline{append} which takes the tuple of both states as well as
the topic and partition combination and the set of messages to append. As
illustrated in figure \ref{fig:broker-log-append}, not only is the module \textit{Log} involved
in this process but also the module \textit{Index}.
The concept behind the \lstinline{append} function is that the
\textit{LogManager} takes the value of the MVar, computes log and index file
specific functions in a pure fashion, and puts the value back in the MVar and
after the append process is completed. During this process -- between
\lstinline{takeMVar} and \lstinline{putMVar} -- the resource is locked. As for
now, concurrent appending is not supported but may become possible in the
future with this concept. The following listing describes the computations
happening in between the mentioned unwrapping of MVar:
\begin{lstlisting}[caption={Append function exposed by LogManager}]
append :: (State, L.TopicStr, L.PartitionNr, Log) -> IO ()
append ((Log.LogState ls, Index.IndexState is), t, p, ms) = do
logs <- takeMVar ls
let log = Log.find (t, p) logs -- (1)
let llo = fromMaybe (-1) (Log.lastOffset log)
let recvLog = Log.continueOffset (llo + 1) ms -- (2)
let newLog = log ++ recvLog -- (3)
indices <- takeMVar is
let index = Index.lastOrNull (Index.find (t, p) indices) -- (4)
bo <- L.getBaseOffset (t, p) Nothing
let lastIndexedOffset = fromIntegral (fst index) + (fromIntegral bo)
if Index.isInterval (Log.sizeRange (Just lastIndexedOffset) Nothing newLog) -- (5)
then do
syncedIndices <- Index.append indices (t, p) newLog (Log.size newLog)
putMVar is syncedIndices
else do
putMVar is indices
let newLogs = Map.insert (t, p) newLog logs
syncedLogs <- Log.append (t, p) newLogs -- (6)
putMVar ls syncedLogs
\end{lstlisting}
\begin{description}
\item[(1)]
The log relating to the given topic and partition combination is
searched within all existing and not to the disk persisted logs residing
in the in-memory data structure (\ref{subsec:broker-log-inmemory}). Except
for the first message topic and partition combination, there will always be
at least one element within this map. The remaining element in this list is
required to later continue the offset number for the incoming