Skip to content
Tao Jiang edited this page Nov 30, 2020 · 7 revisions

Amazon Kinesis enables real-time processing of streaming data at a massive scale. Kinesis Streams is useful for rapidly moving data off data producers and then continue processing the data, be it to transform the data before emitting to a data store, run real-time metrics and analytics, or derive more complex data streams for further processing.

How Kinesis works

Benefits of Using Kinesis Data Streams

  • Enables ingesting, buffering, and processing streams of data in real-time.
  • It also enables architectural changes in transforming a system from API driven to event-driven.
  • Data is put into Kinesis data streams, which ensures durability and elasticity.
  • Multiple Kinesis Data Streams applications can consume data from a stream, so that multiple actions, like archiving and processing, can take place concurrently and independently.

Publishing and Consuming

  • To send data to the stream, configure producers using the Kinesis Streams PUT API operation or the Amazon Kinesis Producer Library (KPL). Learn more.
  • For custom processing and analyzing of real-time, streaming data, use the Kinesis Client Library (KCL)

Kinesis Client Library

The Kinesis Client Library (KCL) enables fault-tolerant consumption of data from streams and provides scaling support for Kinesis Data Streams applications. KCL is built on top of AWS Kinesis SDK instead of replacing it. For a simple application, AWS Kinesis SDK provides a simple API to pull data from a stream continuously, it works relatively well if there is only one shard for the stream. In a multi-shard scenario, the client developer must deal with lots of complexity:

  1. Connects to the stream
  2. Enumerates the shards
  3. Coordinates shard associations with other workers (if any)
  4. Instantiates a record processor for every shard it manages
  5. Pulls data records from the stream
  6. Pushes the records to the corresponding record processor
  7. Checkpoints processed records
  8. Balances shard-worker associations when the worker instance count changes
  9. Balances shard-worker associations when shards are split or merged

Therefore, in production, almost no one directly uses AWS Kinesis SDK. For a better understanding of how Kinesis works, the complexity of Kinesis consumption, especially how KCL helps to solve the problem, please refer to the presentation presented by Marvin Theimer on AWS re:Invent (BDT311). KCL is the de facto standard of Kinesis Consumption. Unfortunately, KCL is a Java native implementation only and there is no Go native implementation for KCL.

In order to support other languages, AWS has implemented a Java-based daemon, called MultiLangDaemon that does all the heavy lifting. MultiLangDaemon itself is a Java KCL application that implements an internal record processor. The MultiLangDaemon spawns a sub-process, which in turn runs another record processor, which can be written in any language. The MultiLangDaemon process and the record processor sub-process communicate with each other over STDIN and STDOUT using a defined protocol. There will be a one to one correspondence amongst record processors, child processes, and shards. The major drawbacks of this approach are:

  • Java dependency
  • Inefficiency: two record processors, the internal processor in Java and external processor in any other language

VMware-Go-KCL

VMware-Go-KCL brings Go/Kubernetes community with Go language native implementation of KCL matching exactly the same API and functional spec of original Java KCL v2.0 without the resource overhead of installing Java-based MultiLangDaemon.

Interface:

type (
   // IRecordProcessor is the interface for some callback functions invoked by KCL will
   // The main task of using KCL is to provide implementation on IRecordProcessor interface.
   // Note: This is exactly the same interface as Amazon KCL IRecordProcessor v2
   IRecordProcessor interface {
      /**
       * Invoked by the Amazon Kinesis Client Library before data records are delivered to the RecordProcessor instance
       * (via processRecords).
       *
       * @param initializationInput Provides information related to initialization
       */
      Initialize(initializationInput *InitializationInput)
 
      /**
       * Process data records. The Amazon Kinesis Client Library will invoke this method to deliver data records to the
       * application.
       * Upon failover, the new instance will get records with sequence number > checkpoint position
       * for each partition key.
       *
       * @param processRecordsInput Provides the records to be processed as well as information and capabilities related
       *        to them (eg checkpointing).
       */
      ProcessRecords(processRecordsInput *ProcessRecordsInput)
 
      /**
       * Invoked by the Amazon Kinesis Client Library to indicate it will no longer send data records to this
       * RecordProcessor instance.
       *
       * <h2><b>Warning</b></h2>
       *
       * When the value of {@link ShutdownInput#getShutdownReason()} is
       * {@link com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason#TERMINATE} it is required that you
       * checkpoint. Failure to do so will result in an IllegalArgumentException, and the KCL no longer making progress.
       *
       * @param shutdownInput
       *            Provides information and capabilities (eg checkpointing) related to shutdown of this record processor.
       */
      Shutdown(shutdownInput *ShutdownInput)
   }
 
   // IRecordProcessorFactory is interface for creating IRecordProcessor. Each Worker can have multiple threads
   // for processing shard. The client can choose either to create one processor per shard or sharing them.
   IRecordProcessorFactory interface {
 
      /**
       * Returns a record processor to be used for processing data records for a (assigned) shard.
       *
       * @return Returns a processor object.
       */
      CreateProcessor() IRecordProcessor
   }
)

Usage Example:

func main() {
   kclConfig := cfg.NewKinesisClientLibConfig("appName", streamName, regionName, workerID).
      WithInitialPositionInStream(cfg.LATEST).
      WithMaxRecords(10).
      WithMaxLeasesForWorker(1).
      WithShardSyncIntervalMillis(5000).
      WithFailoverTimeMillis(300000)
 
   // configure cloudwatch as metrics system
   metricsConfig := &metrics.MonitoringConfiguration{
      MonitoringService: "cloudwatch",
      Region:            regionName,
      CloudWatch: metrics.CloudWatchMonitoringService{
         // Those value should come from kclConfig
         MetricsBufferTimeMillis: 10000,
         MetricsMaxQueueSize:     20,
      },
 
   worker := wk.NewWorker(recordProcessorFactory(), kclConfig, metricsConfig)
   worker.Start()
 
   // Put some data into the stream.
   for i := 0; i < 100; i++ {
      // Use random string as partition key to ensure even distribution across shards
      err := worker.Publish(streamName, utils.RandStringBytesMaskImpr(10), []byte("hello world"))
      if err != nil {
         log.Printf("Errorin Publish. %+v", err)
      }
   }
 
   // wait a few seconds before shutdown processing
   time.Sleep(10 * time.Second)
   worker.Shutdown()
}
 
// Record processor factory is used to create RecordProcessor
func recordProcessorFactory() kc.IRecordProcessorFactory {
   return &dumpRecordProcessorFactory{}
}
 
// simple record processor and dump everything
type dumpRecordProcessorFactory struct {
}
 
func (d *dumpRecordProcessorFactory) CreateProcessor() kc.IRecordProcessor {
   return &dumpRecordProcessor{}
}
 
// Create a dump record processor for printing out all data from record.
type dumpRecordProcessor struct {
}
 
func (dd *dumpRecordProcessor) Initialize(input *kc.InitializationInput) {
   log.Printf("Processing SharId: %v at checkpoint: %v", input.ShardId, aws.StringValue(input.ExtendedSequenceNumber.SequenceNumber))
}
 
func (dd *dumpRecordProcessor) ProcessRecords(input *kc.ProcessRecordsInput) {
   log.Print("Processing Records...")
 
   // don't process empty record
   if len(input.Records) == 0 {
      return
   }
 
   for _, v := range input.Records {
      log.Printf("Record = %s", v.Data)
   }
 
   // checkpoint it after processing this batch
   lastRecordSequenceNubmer := input.Records[len(input.Records)-1].SequenceNumber
   log.Printf("Checkpoint progress at: %v,  MillisBehindLatest = %v", lastRecordSequenceNubmer, input.MillisBehindLatest)
   input.Checkpointer.Checkpoint(lastRecordSequenceNubmer)
}
 
func (dd *dumpRecordProcessor) Shutdown(input *kc.ShutdownInput) {
   log.Printf("Shutdown Reason: %v", aws.StringValue(kc.ShutdownReasonMessage(input.ShutdownReason)))
 
   // When the value of {@link ShutdownInput#getShutdownReason()} is
   // {@link ShutdownReason#TERMINATE} it is required that you
   // checkpoint. Failure to do so will result in an IllegalArgumentException, and the KCL no longer making progress.
   if input.ShutdownReason == kc.TERMINATE {
      input.Checkpointer.Checkpoint(nil)
   }
}

Reference and Tutorial

Because VMware-Go-KCL matches exactly the same interface and programming model from the original Amazon KCL, the best place for getting a reference, tutorial is from Amazon itself:

  • Developing Consumers Using the Kinesis Client Library
  • Troubleshooting
  • Advanced Topics

Support and Contact

Open source embodies a model for people to work together, building something greater than they can create on their own. For any project related issues and questions, please create an issue in the VMware-Go-KCL repository.