diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..d9f9451 --- /dev/null +++ b/.gitignore @@ -0,0 +1,7 @@ +# IDE +.vscode +.history + +# Glide +glide.lock +vendor diff --git a/.travis.yml b/.travis.yml new file mode 100644 index 0000000..c56ad54 --- /dev/null +++ b/.travis.yml @@ -0,0 +1,42 @@ +sudo: false + +language: go + +go: + - 1.10.3 + +# Only clone the most recent commit +git: + depth: 1 + +env: + global: + - DEP_VERSION="0.5.0" + - GOLANGCI_LINT_VERSION="1.9.3" + +before_install: + # Download dep binary to $GOPATH/bin + - curl -L -s https://github.com/golang/dep/releases/download/v${DEP_VERSION}/dep-linux-amd64 -o $GOPATH/bin/dep + - chmod +x $GOPATH/bin/dep + # Download golangci-lint binary to $GOPATH/bin + - curl -sfL https://install.goreleaser.com/github.com/golangci/golangci-lint.sh | bash -s -- -b $GOPATH/bin v${GOLANGCI_LINT_VERSION} + +install: + - dep ensure + +script: + - golangci-lint run -v + -E golint + -E interfacer + -E unconvert + -E dupl + -E goconst + -E gocyclo + -E maligned + -E misspell + -E lll + -E unparam + consumer/... + mocks/... + producer/... + - go test -v ./... diff --git a/Gopkg.lock b/Gopkg.lock new file mode 100644 index 0000000..f69be16 --- /dev/null +++ b/Gopkg.lock @@ -0,0 +1,231 @@ +# This file is autogenerated, do not edit; changes may be undone by the next 'dep ensure'. + + +[[projects]] + digest = "1:1e283d7680f5395cddb88a422783c8877ff60c28fc1d8fde27c808f1720c512e" + name = "github.com/Shopify/sarama" + packages = ["."] + pruneopts = "UT" + revision = "f7be6aa2bc7b2e38edf816b08b582782194a1c02" + version = "v1.16.0" + +[[projects]] + digest = "1:526d64d0a3ac6c24875724a9355895be56a21f89a5d3ab5ba88d91244269a7d8" + name = "github.com/bsm/sarama-cluster" + packages = ["."] + pruneopts = "UT" + revision = "c618e605e15c0d7535f6c96ff8efbb0dba4fd66c" + version = "v2.1.15" + +[[projects]] + digest = "1:ffe9824d294da03b391f44e1ae8281281b4afc1bdaa9588c9097785e3af10cec" + name = "github.com/davecgh/go-spew" + packages = ["spew"] + pruneopts = "UT" + revision = "8991bc29aa16c548c550c7ff78260e27b9ab7c73" + version = "v1.1.1" + +[[projects]] + digest = "1:1f0c7ab489b407a7f8f9ad16c25a504d28ab461517a971d341388a56156c1bd7" + name = "github.com/eapache/go-resiliency" + packages = ["breaker"] + pruneopts = "UT" + revision = "ea41b0fad31007accc7f806884dcdf3da98b79ce" + version = "v1.1.0" + +[[projects]] + branch = "master" + digest = "1:79f16588b5576b1b3cd90e48d2374cc9a1a8776862d28d8fd0f23b0e15534967" + name = "github.com/eapache/go-xerial-snappy" + packages = ["."] + pruneopts = "UT" + revision = "776d5712da21bc4762676d614db1d8a64f4238b0" + +[[projects]] + digest = "1:444b82bfe35c83bbcaf84e310fb81a1f9ece03edfed586483c869e2c046aef69" + name = "github.com/eapache/queue" + packages = ["."] + pruneopts = "UT" + revision = "44cc805cf13205b55f69e14bcb69867d1ae92f98" + version = "v1.1.0" + +[[projects]] + branch = "master" + digest = "1:4a0c6bb4805508a6287675fac876be2ac1182539ca8a32468d8128882e9d5009" + name = "github.com/golang/snappy" + packages = ["."] + pruneopts = "UT" + revision = "2e65f85255dbc3072edf28d6b5b8efc472979f5a" + +[[projects]] + digest = "1:a1038ef593beb4771c8f0f9c26e8b00410acd800af5c6864651d9bf160ea1813" + name = "github.com/hpcloud/tail" + packages = [ + ".", + "ratelimiter", + "util", + "watch", + "winfile", + ] + pruneopts = "UT" + revision = "a30252cb686a21eb2d0b98132633053ec2f7f1e5" + version = "v1.0.0" + +[[projects]] + digest = "1:42e29deef12327a69123b9cb2cb45fee4af5c12c2a23c6e477338279a052703f" + name = "github.com/onsi/ginkgo" + packages = [ + ".", + "config", + "internal/codelocation", + "internal/containernode", + "internal/failer", + "internal/leafnodes", + "internal/remote", + "internal/spec", + "internal/spec_iterator", + "internal/specrunner", + "internal/suite", + "internal/testingtproxy", + "internal/writer", + "reporters", + "reporters/stenographer", + "reporters/stenographer/support/go-colorable", + "reporters/stenographer/support/go-isatty", + "types", + ] + pruneopts = "UT" + revision = "3774a09d95489ccaa16032e0770d08ea77ba6184" + version = "v1.6.0" + +[[projects]] + digest = "1:29f294e6a3b9d30629266b2765a8c203056387941e600405b8e8871c84653042" + name = "github.com/onsi/gomega" + packages = [ + ".", + "format", + "internal/assertion", + "internal/asyncassertion", + "internal/oraclematcher", + "internal/testingtsupport", + "matchers", + "matchers/support/goraph/bipartitegraph", + "matchers/support/goraph/edge", + "matchers/support/goraph/node", + "matchers/support/goraph/util", + "types", + ] + pruneopts = "UT" + revision = "b6ea1ea48f981d0f615a154a45eabb9dd466556d" + version = "v1.4.1" + +[[projects]] + digest = "1:29803f52611cbcc1dfe55b456e9fdac362af7248b3d29d7ea1bec0a12e71dff4" + name = "github.com/pierrec/lz4" + packages = [ + ".", + "internal/xxh32", + ] + pruneopts = "UT" + revision = "1958fd8fff7f115e79725b1288e0b878b3e06b00" + version = "v2.0.3" + +[[projects]] + digest = "1:40e195917a951a8bf867cd05de2a46aaf1806c50cf92eebf4c16f78cd196f747" + name = "github.com/pkg/errors" + packages = ["."] + pruneopts = "UT" + revision = "645ef00459ed84a119197bfb8d8205042c6df63d" + version = "v0.8.0" + +[[projects]] + branch = "master" + digest = "1:c4556a44e350b50a490544d9b06e9fba9c286c21d6c0e47f54f3a9214597298c" + name = "github.com/rcrowley/go-metrics" + packages = ["."] + pruneopts = "UT" + revision = "e2704e165165ec55d062f5919b4b29494e9fa790" + +[[projects]] + branch = "master" + digest = "1:fd35bc6003cfae47dd6041db172fb971f3e08991ecdf2971bd03dcabedef3b0b" + name = "golang.org/x/net" + packages = [ + "html", + "html/atom", + "html/charset", + ] + pruneopts = "UT" + revision = "8a410e7b638dca158bf9e766925842f6651ff828" + +[[projects]] + branch = "master" + digest = "1:19f92ce03256cc8a4467054842ec81f081985becd92bbc443e7604dfe801e6a8" + name = "golang.org/x/sys" + packages = ["unix"] + pruneopts = "UT" + revision = "4910a1d54f876d7b22162a85f4d066d3ee649450" + +[[projects]] + digest = "1:aa4d6967a3237f8367b6bf91503964a77183ecf696f1273e8ad3551bb4412b5f" + name = "golang.org/x/text" + packages = [ + "encoding", + "encoding/charmap", + "encoding/htmlindex", + "encoding/internal", + "encoding/internal/identifier", + "encoding/japanese", + "encoding/korean", + "encoding/simplifiedchinese", + "encoding/traditionalchinese", + "encoding/unicode", + "internal/gen", + "internal/tag", + "internal/utf8internal", + "language", + "runes", + "transform", + "unicode/cldr", + ] + pruneopts = "UT" + revision = "f21a4dfb5e38f5895301dc265a8def02365cc3d0" + version = "v0.3.0" + +[[projects]] + digest = "1:abeb38ade3f32a92943e5be54f55ed6d6e3b6602761d74b4aab4c9dd45c18abd" + name = "gopkg.in/fsnotify.v1" + packages = ["."] + pruneopts = "UT" + revision = "c2828203cd70a50dcccfb2761f8b1f8ceef9a8e9" + source = "https://github.com/fsnotify/fsnotify/archive/v1.4.7.tar.gz" + version = "v1.4.7" + +[[projects]] + branch = "v1" + digest = "1:0caa92e17bc0b65a98c63e5bc76a9e844cd5e56493f8fdbb28fad101a16254d9" + name = "gopkg.in/tomb.v1" + packages = ["."] + pruneopts = "UT" + revision = "dd632973f1e7218eb1089048e0798ec9ae7dceb8" + +[[projects]] + digest = "1:342378ac4dcb378a5448dd723f0784ae519383532f5e70ade24132c4c8693202" + name = "gopkg.in/yaml.v2" + packages = ["."] + pruneopts = "UT" + revision = "5420a8b6744d3b0345ab293f6fcba19c978f1183" + version = "v2.2.1" + +[solve-meta] + analyzer-name = "dep" + analyzer-version = 1 + input-imports = [ + "github.com/Shopify/sarama", + "github.com/bsm/sarama-cluster", + "github.com/onsi/ginkgo", + "github.com/onsi/gomega", + "github.com/pkg/errors", + ] + solver-name = "gps-cdcl" + solver-version = 1 diff --git a/Gopkg.toml b/Gopkg.toml new file mode 100644 index 0000000..be9650d --- /dev/null +++ b/Gopkg.toml @@ -0,0 +1,49 @@ +# Gopkg.toml example +# +# Refer to https://github.com/golang/dep/blob/master/docs/Gopkg.toml.md +# for detailed Gopkg.toml documentation. +# +# required = ["github.com/user/thing/cmd/thing"] +# ignored = ["github.com/user/project/pkgX", "bitbucket.org/user/project/pkgA/pkgY"] +# +# [[constraint]] +# name = "github.com/user/project" +# version = "1.0.0" +# +# [[constraint]] +# name = "github.com/user/project2" +# branch = "dev" +# source = "github.com/myfork/project2" +# +# [[override]] +# name = "github.com/x/y" +# version = "2.4.0" +# +# [prune] +# non-go = false +# go-tests = true +# unused-packages = true + +[[override]] + source = "https://github.com/fsnotify/fsnotify/archive/v1.4.7.tar.gz" + name = "gopkg.in/fsnotify.v1" + +[[constraint]] + name = "github.com/Shopify/sarama" + version = "~1.16.0" + +[[constraint]] + name = "github.com/bsm/sarama-cluster" + version = "2.1.12" + +[[constraint]] + name = "github.com/onsi/ginkgo" + version = "1.6.0" + +[[constraint]] + name = "github.com/onsi/gomega" + version = "1.4.1" + +[prune] + go-tests = true + unused-packages = true diff --git a/README.md b/README.md new file mode 100644 index 0000000..255f1a7 --- /dev/null +++ b/README.md @@ -0,0 +1,89 @@ +## KafkaUtils for Sarama library + +This is just a simple Go library providing abstraction over Shopify's [Sarama][0] library. + +### Usage: + +* Install [dep][1] dependencies: + +``` +dep ensure +``` +* Import/Use in your code! + +```Go +import github.com/TerrexTech/go-kafkautils/consumer // Import Consumer +import github.com/TerrexTech/go-kafkautils/producer // Import Producer +``` + +### Docs: + +* **[Consumer][2]** +* **[Producer][3]** + +### Minimal examples: + +#### Consumer: + +```Go +msgHandler := func(msg *sarama.ConsumerMessage, c *consumer.Consumer) { + // Convert from []byte to string + println("Received message: ", string(msg.Value)) + consumer := c.Get() + if !c.IsClosed() { + consumer.MarkOffset(msg, "") + } else { + log.Fatalln("Consumer was closed before offsets could be marked.") + } +} + +errHandler := func(e *error) { + log.Fatalln((*e).Error()) +} + +config := consumer.Config{ + ConsumerGroup: "test", + ErrHandler: errHandler, + KafkaBrokers: []string{"localhost:9092"}, + MsgHandler: msgHandler, + Topics: []string{"test"}, +} + +proxyconsumer, _ := consumer.New(&config) +proxyconsumer.EnableLogging() + +// Temporary hack for simplicity. Use channels/app-logic in actual application. +time.Sleep(100000 * time.Millisecond) +``` + +#### Producer: + +```Go +errHandler := func(err *sarama.ProducerError) { + errs := *err + fmt.Println(errs.Error()) +} +config := producer.Config{ + ErrHandler: errHandler, + KafkaBrokers: []string{"localhost:9092"}, +} +asyncProducer, err := producer.New(&config) +asyncProducer.EnableLogging() + +strTime := strconv.Itoa(int(time.Now().Unix())) +msg := asyncProducer.CreateKeyMessage("test", strTime, "testValue") + +if err != nil { + panic(err) +} +input, _ := asyncProducer.Input() +input <- msg + +// Temporary hack for simplicity. Use channels/app-logic in actual application. +time.Sleep(2000 * time.Millisecond) +``` + + [0]: https://github.com/Shopify/sarama + [1]: https://github.com/golang/dep + [2]: https://godoc.org/github.com/TerrexTech/go-kafkautils/consumer + [3]: https://godoc.org/github.com/TerrexTech/go-kafkautils/producer diff --git a/consumer/consumer.go b/consumer/consumer.go new file mode 100644 index 0000000..8130e61 --- /dev/null +++ b/consumer/consumer.go @@ -0,0 +1,203 @@ +package consumer + +import ( + "log" + "os" + "os/signal" + "syscall" + "time" + + "github.com/Shopify/sarama" + cluster "github.com/bsm/sarama-cluster" + "github.com/pkg/errors" +) + +// Adapter is the Kafka-Consumer interface +type Adapter interface { + Close() error + CommitOffsets() error + Errors() <-chan error + HighWaterMarks() map[string]map[int32]int64 + MarkOffset(msg *sarama.ConsumerMessage, metadata string) + MarkOffsets(s *cluster.OffsetStash) + MarkPartitionOffset(topic string, partition int32, offset int64, metadata string) + Messages() <-chan *sarama.ConsumerMessage + Notifications() <-chan *cluster.Notification + Partitions() <-chan cluster.PartitionConsumer + ResetOffset(msg *sarama.ConsumerMessage, metadata string) + ResetOffsets(s *cluster.OffsetStash) + ResetPartitionOffset(topic string, partition int32, offset int64, metadata string) + Subscriptions() map[string][]int32 +} + +// Config wraps configuration for consumer +type Config struct { + ConsumerGroup string + ErrHandler func(*error) + KafkaBrokers []string + MsgHandler func(*sarama.ConsumerMessage, *Consumer) + NtfnHandler func(*cluster.Notification) + // Allow overwriting default sarama-config + SaramaConfig *cluster.Config + Topics []string +} + +// Consumer wraps sarama-cluster's consumer +type Consumer struct { + consumer Adapter + isClosed bool + isLoggingEnabled bool +} + +// To facilitate testing. This var gets overwritten by custon +// init function. +// We don't pass the init function as argument or +// via dependency-injection because the purpose of +// this library is to highly abstract the kafka configs +var initFunc func([]string, string, []string, *cluster.Config) (*cluster.Consumer, error) + +func init() { + initFunc = cluster.NewConsumer +} + +// New returns a configured Sarama Kafka-Consumer instance +func New(initConfig *Config) (*Consumer, error) { + if initConfig.KafkaBrokers == nil || len(initConfig.KafkaBrokers) == 0 { + errorLogMsg := errors.New("error creating new ConsumerGroup, no Kafka Brokers set") + return nil, errorLogMsg + } + + var config *cluster.Config + if initConfig.SaramaConfig != nil { + config = initConfig.SaramaConfig + } else { + config = cluster.NewConfig() + config.Consumer.Offsets.Initial = sarama.OffsetNewest + config.Consumer.MaxProcessingTime = 10 * time.Second + config.Consumer.Return.Errors = true + config.Group.Return.Notifications = true + } + + consumer, err := initFunc(initConfig.KafkaBrokers, initConfig.ConsumerGroup, initConfig.Topics, config) + + if err != nil { + errorLogMsg := errors.Wrap(err, "Failed to join consumer group: "+initConfig.ConsumerGroup) + return nil, errorLogMsg + } + + proxyConsumer := Consumer{ + consumer: consumer, + isClosed: false, + isLoggingEnabled: false, + } + + // Don't run these functions when mocking consumer, + // where initial consumer is nil. + // This initialization is controlled by mock consumer. + if consumer != nil { + proxyConsumer.handleKeyInterrupt() + proxyConsumer.handleErrors(initConfig.ErrHandler) + proxyConsumer.handleMessages(initConfig.MsgHandler) + proxyConsumer.handleNotifications(initConfig.NtfnHandler) + } + // log.Println("Consumer waiting for messages.") + return &proxyConsumer, nil +} + +// EnableLogging logs events to console +func (c *Consumer) EnableLogging() { + c.isLoggingEnabled = true +} + +// IsClosed returns a bool specifying if Kafka consumer is closed +func (c *Consumer) IsClosed() bool { + return c.isClosed +} + +// SaramaConsumerGroup returns the original Sarama Kafka consumer-group +func (c *Consumer) SaramaConsumerGroup() Adapter { + return c.consumer +} + +func (c *Consumer) handleKeyInterrupt() { + // Capture the Ctrl+C signal (interrupt or kill) + sigChan := make(chan os.Signal, 1) + signal.Notify(sigChan, + syscall.SIGINT, + syscall.SIGTERM, + syscall.SIGQUIT) + + // Elegant exit + go func() { + <-sigChan + log.Println("Keyboard-Interrupt signal received.") + closeError := <-c.Close() + log.Println(closeError) + }() +} + +func (c *Consumer) handleErrors(errHandler func(*error)) { + consumer := c.SaramaConsumerGroup() + go func() { + for err := range consumer.Errors() { + if c.isLoggingEnabled { + log.Fatalln("Failed to read messages from topic:", err) + } + if errHandler != nil { + errHandler(&err) + } + } + }() +} + +func (c *Consumer) handleMessages(msgHandler func(*sarama.ConsumerMessage, *Consumer)) { + consumer := c.SaramaConsumerGroup() + go func() { + for message := range consumer.Messages() { + if c.isLoggingEnabled { + log.Printf("Topic: %s\t Partition: %v\t Offset: %v\n", message.Topic, message.Partition, message.Offset) + } + msgHandler(message, c) + } + }() +} + +// Consumer-Rebalancing notifications +func (c *Consumer) handleNotifications(ntfnHandler func(*cluster.Notification)) { + consumer := c.SaramaConsumerGroup() + go func() { + for ntf := range consumer.Notifications() { + if c.isLoggingEnabled { + log.Printf("Rebalanced: %+v\n", ntf) + } + if ntfnHandler != nil { + ntfnHandler(ntf) + } + } + }() +} + +// Close attempts to close the consumer, +// and returns any occurring errors over channel +func (c *Consumer) Close() chan error { + if c.IsClosed() { + return nil + } + + closeErrorChan := make(chan error, 1) + go func() { + err := c.SaramaConsumerGroup().Close() + if err != nil { + if c.isLoggingEnabled { + log.Fatal("Error closing consumer.", err) + } + closeErrorChan <- err + } + if c.isLoggingEnabled { + log.Println("Consumer closed.") + } + c.isClosed = true + }() + + return closeErrorChan +} diff --git a/consumer/consumer_test.go b/consumer/consumer_test.go new file mode 100644 index 0000000..d886f07 --- /dev/null +++ b/consumer/consumer_test.go @@ -0,0 +1,272 @@ +package consumer + +import ( + "sync" + "testing" + "time" + + "github.com/TerrexTech/go-kafkautils/mocks" + + "github.com/Shopify/sarama" + cluster "github.com/bsm/sarama-cluster" + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" +) + +// TestConsumer tests the critical Consumer functions +func TestConsumer(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "Consumer Suite") +} + +var mockInitFunc = func([]string, string, []string, *cluster.Config) (*cluster.Consumer, error) { + return nil, nil +} + +func setupMockConsumer(initConfig *Config) (*Consumer, *mocks.Consumer, error) { + // Overwrite the original init function + initFunc = mockInitFunc + proxyConsumer, err := New(initConfig) + + fakeConsumer := &mocks.Consumer{ + ErrorsChan: make(chan error), + MessagesChan: make(chan *sarama.ConsumerMessage), + NotificationsChan: make(chan *cluster.Notification), + PartitionsChan: make(chan cluster.PartitionConsumer), + } + + proxyConsumer.consumer = fakeConsumer + // Re-attach the handlers + proxyConsumer.handleKeyInterrupt() + proxyConsumer.handleErrors(initConfig.ErrHandler) + proxyConsumer.handleMessages(initConfig.MsgHandler) + proxyConsumer.handleNotifications(initConfig.NtfnHandler) + return proxyConsumer, fakeConsumer, err +} + +// Unlike sarama, sarama-cluster does not provide a mock broker. +// So we'll have to interfave the consumer more extensively. +// Be advised: Some minor hacks have been used to facilitate testing. +var _ = Describe("Consumer", func() { + Context("new instance is requested", func() { + var ( + config *Config + errHandler func(*error) + + proxyConsumer *Consumer + err error + ) + + BeforeEach(func() { + initFunc = mockInitFunc + + errHandler = func(err *error) { + Fail("Error occurred: " + (*err).Error()) + } + config = &Config{ + ConsumerGroup: "test-group", + KafkaBrokers: []string{"test-broker"}, + ErrHandler: errHandler, + Topics: []string{"test-topics"}, + } + proxyConsumer, _, err = setupMockConsumer(config) + }) + AfterEach(func() { + proxyConsumer.Close() + }) + + It("should return a new sarama-cluster consumer instance", func() { + Expect(*proxyConsumer).To(BeAssignableToTypeOf(Consumer{})) + Expect(err).To(BeNil()) + }) + + It("should return the error when initializing new consumer fails", func() { + initFunc = cluster.NewConsumer + _, err := New(config) + Expect(err).To(HaveOccurred()) + }) + + It("should return error when no brokers are provided", func() { + config = &Config{ + ErrHandler: errHandler, + } + _, err := New(config) + Expect(err).To(HaveOccurred()) + }) + }) + + Context("an error occurs while fetching messages", func() { + var ( + config *Config + errHandler func(*error) + isErrHandlerCalled bool + fakeConsumer *mocks.Consumer + ) + + BeforeEach(func() { + isErrHandlerCalled = false + errHandler = func(_ *error) { + isErrHandlerCalled = true + } + config = &Config{ + ConsumerGroup: "test-group", + KafkaBrokers: []string{"test-broker"}, + ErrHandler: errHandler, + Topics: []string{"test-topics"}, + } + _, fakeConsumer, _ = setupMockConsumer(config) + }) + AfterEach(func() { + fakeConsumer.Close() + }) + + It("should run the error-handler function", func() { + var wg sync.WaitGroup + wg.Add(1) + go func() { + fakeConsumer.ErrorsChan <- fakeConsumer.CreateMockError() + defer wg.Done() + }() + wg.Wait() + + // Wait for error-handler routines to complete + time.Sleep(5 * time.Millisecond) + Expect(isErrHandlerCalled).To(BeTrue()) + }) + }) + + Context("new message is received", func() { + var ( + config *Config + errHandler func(*error) + msgHandler func(*sarama.ConsumerMessage, *Consumer) + isMsgHandlerCalled bool + fakeConsumer *mocks.Consumer + ) + + BeforeEach(func() { + isMsgHandlerCalled = false + errHandler = func(err *error) { + Fail("Error occurred: " + (*err).Error()) + } + msgHandler = func(_ *sarama.ConsumerMessage, _ *Consumer) { + isMsgHandlerCalled = true + } + + config = &Config{ + ConsumerGroup: "test-group", + KafkaBrokers: []string{"test-broker"}, + ErrHandler: errHandler, + MsgHandler: msgHandler, + Topics: []string{"test-topics"}, + } + _, fakeConsumer, _ = setupMockConsumer(config) + }) + AfterEach(func() { + fakeConsumer.Close() + }) + + It("should run the message-handler function", func() { + var wg sync.WaitGroup + wg.Add(1) + go func() { + fakeConsumer.MessagesChan <- fakeConsumer.CreateMockMessage("test", "test", "test") + defer wg.Done() + }() + wg.Wait() + + // Wait for error-handler routines to complete + time.Sleep(5 * time.Millisecond) + Expect(isMsgHandlerCalled).To(BeTrue()) + }) + }) + + Context("new notification is received", func() { + var ( + config *Config + errHandler func(*error) + ntfnHandler func(*cluster.Notification) + isNtfnHandlerCalled bool + fakeConsumer *mocks.Consumer + ) + + BeforeEach(func() { + isNtfnHandlerCalled = false + errHandler = func(err *error) { + Fail("Error occurred: " + (*err).Error()) + } + ntfnHandler = func(_ *cluster.Notification) { + isNtfnHandlerCalled = true + } + + config = &Config{ + ConsumerGroup: "test-group", + KafkaBrokers: []string{"test-broker"}, + ErrHandler: errHandler, + NtfnHandler: ntfnHandler, + Topics: []string{"test-topics"}, + } + _, fakeConsumer, _ = setupMockConsumer(config) + }) + AfterEach(func() { + fakeConsumer.Close() + }) + + It("should run the message-handler function", func() { + var wg sync.WaitGroup + wg.Add(1) + go func() { + fakeConsumer.NotificationsChan <- fakeConsumer.CreateMockNotification() + defer wg.Done() + }() + wg.Wait() + + // Wait for error-handler routines to complete + time.Sleep(5 * time.Millisecond) + Expect(isNtfnHandlerCalled).To(BeTrue()) + }) + }) + + Context("consumer is requested to be closed", func() { + var ( + config *Config + errHandler func(*error) + + proxyConsumer *Consumer + fakeConsumer *mocks.Consumer + ) + + BeforeEach(func() { + errHandler = func(err *error) { + Fail("Error occurred: " + (*err).Error()) + } + config = &Config{ + ConsumerGroup: "test-group", + KafkaBrokers: []string{"test-broker"}, + ErrHandler: errHandler, + Topics: []string{"test-topics"}, + } + proxyConsumer, fakeConsumer, _ = setupMockConsumer(config) + }) + AfterEach(func() { + proxyConsumer.Close() + }) + + It("should return nil if already closed", func() { + proxyConsumer.Close() + // Allow consumer to close + time.Sleep(5 * time.Millisecond) + Expect(proxyConsumer.Close()).To(BeNil()) + }) + + It("should close consumer if its open", func() { + Expect(proxyConsumer.IsClosed()).To(BeFalse()) + proxyConsumer.Close() + + // Allow consumer to close + time.Sleep(5 * time.Millisecond) + Expect(proxyConsumer.IsClosed()).To(BeTrue()) + Expect(fakeConsumer.IsClosed()).To(BeTrue()) + }) + }) +}) diff --git a/mocks/asnyc_producer.go b/mocks/asnyc_producer.go new file mode 100644 index 0000000..4d4f2f3 --- /dev/null +++ b/mocks/asnyc_producer.go @@ -0,0 +1,86 @@ +package mocks + +import ( + "github.com/pkg/errors" + + "github.com/Shopify/sarama" +) + +// AsyncProducer defines a mock async-producer. +// It provides writeable channels to mock various message-types. +type AsyncProducer struct { + ErrorsChan chan *sarama.ProducerError + InputChan chan *sarama.ProducerMessage + SuccessesChan chan *sarama.ProducerMessage + + // We don't call this var as "isClosed" because similar + // behavior is also shared by mock-consumer, and the names + // would conflict. + isProducerClosed bool +} + +// AsyncClose mocks the #AsyncClose function of sarama AsyncProducer +func (a *AsyncProducer) AsyncClose() { + a.Close() +} + +// Close mocks the #Close function of sarama AsyncProducer. +// This closes all the channels for this mock. +func (a *AsyncProducer) Close() error { + close(a.ErrorsChan) + close(a.InputChan) + close(a.SuccessesChan) + a.isProducerClosed = true + return nil +} + +// Errors mocks the #Errors function of sarama AsyncProducer. +// Returns a read-only error channel. The errors can +// be mocked by writing to ErrorsChan var of this struct. +func (a *AsyncProducer) Errors() <-chan *sarama.ProducerError { + return (<-chan *sarama.ProducerError)(a.ErrorsChan) +} + +// Input mocks the #Input function of sarama AsyncProducer. +// Returns a write-only input channel. The inputs can +// be mocked by reading from InputChan var of this struct. +func (a *AsyncProducer) Input() chan<- *sarama.ProducerMessage { + return (chan<- *sarama.ProducerMessage)(a.InputChan) +} + +// Successes mocks the #Successes function of sarama AsyncProducer. +// Returns a read-only successes channel. The successes can +// be mocked by writing to SuccessesChan var of this struct. +func (a *AsyncProducer) Successes() <-chan *sarama.ProducerMessage { + return (<-chan *sarama.ProducerMessage)(a.SuccessesChan) +} + +// CreateMockMessage creates a mock producer message +func (a *AsyncProducer) CreateMockMessage( + topic string, + key string, + value string, +) *sarama.ProducerMessage { + return &sarama.ProducerMessage{ + Topic: topic, + Key: sarama.StringEncoder(key), + Value: sarama.StringEncoder(value), + } +} + +// CreateMockError creates a mock producer error +func (a *AsyncProducer) CreateMockError( + topic string, + key string, + value string, +) *sarama.ProducerError { + return &sarama.ProducerError{ + Msg: a.CreateMockMessage(topic, key, value), + Err: errors.New("Some mock producer-error"), + } +} + +// IsClosed returns a bool specifying if the mock producer is closed +func (a *AsyncProducer) IsClosed() bool { + return a.isProducerClosed +} diff --git a/mocks/consumer.go b/mocks/consumer.go new file mode 100644 index 0000000..43f3c3a --- /dev/null +++ b/mocks/consumer.go @@ -0,0 +1,121 @@ +package mocks + +import ( + "github.com/pkg/errors" + + "github.com/Shopify/sarama" + "github.com/bsm/sarama-cluster" +) + +// Consumer defines a mock consumer. +// It provides writeable channels to mock various message-types. +type Consumer struct { + ErrorsChan chan error + MessagesChan chan *sarama.ConsumerMessage + NotificationsChan chan *cluster.Notification + PartitionsChan chan cluster.PartitionConsumer + + // We don't call this var as "isClosed" because similar + // behavior is also shared by mock-producer, and the names + // would conflict. + isConsumerClosed bool +} + +// Close mocks the #Close function of sarama-cluster Close. +// This closes all the channels for this mock. +func (c *Consumer) Close() error { + close(c.ErrorsChan) + close(c.MessagesChan) + close(c.NotificationsChan) + close(c.PartitionsChan) + c.isConsumerClosed = true + return nil +} + +// CommitOffsets mock is a no-op (currently) +func (c *Consumer) CommitOffsets() error { + return nil +} + +// Errors mocks the #Errors function of sarama-cluster Consumer. +// Returns a read-only error channel. The errors can +// be mocked by writing to ErrorsChan var of this struct. +func (c *Consumer) Errors() <-chan error { + return (<-chan error)(c.ErrorsChan) +} + +// HighWaterMarks mock is a no-op (currently) +func (c *Consumer) HighWaterMarks() map[string]map[int32]int64 { + return nil +} + +// MarkOffset mock is a no-op (currently) +func (c *Consumer) MarkOffset(msg *sarama.ConsumerMessage, metadata string) {} + +// MarkOffsets mock is a no-op (currently) +func (c *Consumer) MarkOffsets(s *cluster.OffsetStash) {} + +// MarkPartitionOffset mock is a no-op (currently) +func (c *Consumer) MarkPartitionOffset(topic string, partition int32, offset int64, metadata string) {} + +// Messages mocks the #Messages function of sarama-cluster Consumer. +// Returns a read-only messages channel. The messages can +// be mocked by writing to MessagesChan var of this struct. +func (c *Consumer) Messages() <-chan *sarama.ConsumerMessage { + return (<-chan *sarama.ConsumerMessage)(c.MessagesChan) +} + +// Notifications mocks the #Messages function of sarama-cluster Consumer. +// Returns a read-only notifications channel. The notifications can +// be mocked by writing to NotificationsChan var of this struct. +func (c *Consumer) Notifications() <-chan *cluster.Notification { + return (<-chan *cluster.Notification)(c.NotificationsChan) +} + +// Partitions mocks the #Partitions function of sarama-cluster Consumer. +// Returns a read-only partitions channel. The messages can +// be mocked by writing to PartitionsChan var of this struct. +func (c *Consumer) Partitions() <-chan cluster.PartitionConsumer { + return (<-chan cluster.PartitionConsumer)(c.PartitionsChan) +} + +// ResetOffset mock is a no-op (currently) +func (c *Consumer) ResetOffset(msg *sarama.ConsumerMessage, metadata string) {} + +// ResetOffsets mock is a no-op (currently) +func (c *Consumer) ResetOffsets(s *cluster.OffsetStash) {} + +// ResetPartitionOffset mock is a no-op (currently) +func (c *Consumer) ResetPartitionOffset(topic string, partition int32, offset int64, metadata string) { +} + +// Subscriptions mock is a no-op (currently) +func (c *Consumer) Subscriptions() map[string][]int32 { + return nil +} + +// IsClosed returns a bool specifying if the mock consumer is closed +func (c *Consumer) IsClosed() bool { + return c.isConsumerClosed +} + +// CreateMockError creates a mock consumer error +func (c *Consumer) CreateMockError() error { + return errors.New("Mock Consumer connection-error") +} + +// CreateMockMessage creates a mock consumer error +func (c *Consumer) CreateMockMessage(topic string, key string, value string) *sarama.ConsumerMessage { + return &sarama.ConsumerMessage{ + Key: []byte(key), + Value: []byte(value), + Topic: topic, + } +} + +// CreateMockNotification creates a mock consumer error +func (c *Consumer) CreateMockNotification() *cluster.Notification { + return &cluster.Notification{ + Type: 1, + } +} diff --git a/producer/producer.go b/producer/producer.go new file mode 100644 index 0000000..d08ade5 --- /dev/null +++ b/producer/producer.go @@ -0,0 +1,169 @@ +package producer + +import ( + "log" + "os" + "os/signal" + "syscall" + + "github.com/Shopify/sarama" + "github.com/pkg/errors" +) + +// Adapter is the Kafka-Producer interface +type Adapter interface { + AsyncClose() + Close() error + Input() chan<- *sarama.ProducerMessage + Successes() <-chan *sarama.ProducerMessage + Errors() <-chan *sarama.ProducerError +} + +// Config wraps configuration for producer +type Config struct { + ErrHandler func(*sarama.ProducerError) + KafkaBrokers []string + // Allow overwriting default sarama-config + SaramaConfig *sarama.Config +} + +// Producer wraps sarama's producer +type Producer struct { + producer Adapter + isClosed bool + isLoggingEnabled bool +} + +// New returns a configured sarama Kafka-AsyncProducer instance +func New(initConfig *Config) (*Producer, error) { + if initConfig.KafkaBrokers == nil || len(initConfig.KafkaBrokers) == 0 { + return nil, errors.New("no Kafka Brokers set") + } + + var config *sarama.Config + if initConfig.SaramaConfig != nil { + config = initConfig.SaramaConfig + } else { + config = sarama.NewConfig() + config.Producer.Return.Errors = true + config.Producer.RequiredAcks = sarama.WaitForAll + config.Producer.Compression = sarama.CompressionNone + } + + producer, err := sarama.NewAsyncProducer(initConfig.KafkaBrokers, config) + if err != nil { + return nil, errors.Wrap(err, "Producer Connection Error:") + } + + proxyProducer := Producer{ + producer: producer, + isClosed: false, + isLoggingEnabled: false, + } + proxyProducer.handleKeyInterrupt() + proxyProducer.handleErrors(initConfig.ErrHandler) + return &proxyProducer, nil +} + +// EnableLogging logs events to console +func (p *Producer) EnableLogging() { + p.isLoggingEnabled = true +} + +// CreateKeyMessage creates producer-formatted message with key +func (p *Producer) CreateKeyMessage(topic string, key string, value string) *sarama.ProducerMessage { + msg := &sarama.ProducerMessage{ + Topic: topic, + Value: sarama.StringEncoder(value), + } + + if key != "" { + msg.Key = sarama.StringEncoder(key) + } + + return msg +} + +// CreateMessage creates keyless producer-formatted message +func (p *Producer) CreateMessage(topic string, value string) *sarama.ProducerMessage { + return p.CreateKeyMessage(topic, "", value) +} + +// IsClosed returns a bool specifying if Kafka producer is closed +func (p *Producer) IsClosed() bool { + return p.isClosed +} + +// SaramaProducer returns the original Sarama Kafka producer +func (p *Producer) SaramaProducer() *Adapter { + return &p.producer +} + +// Input takes Kafka messages to be produced +func (p *Producer) Input() (chan<- *sarama.ProducerMessage, error) { + if !p.IsClosed() { + return p.producer.Input(), nil + } + + err := errors.New("attemped to pass Input to closed producer") + return nil, err +} + +func (p *Producer) handleKeyInterrupt() { + // Capture the Ctrl+C signal (interrupt or kill) + sigChan := make(chan os.Signal, 1) + signal.Notify(sigChan, + syscall.SIGINT, + syscall.SIGTERM, + syscall.SIGQUIT) + + // Elegant exit + go func() { + <-sigChan + // We always log here, special situation + log.Println("Keyboard-Interrupt signal received.") + closeError := <-p.Close() + log.Fatalln(closeError.Error()) + }() +} + +func (p *Producer) handleErrors(errHandler func(*sarama.ProducerError)) { + producer := *p.SaramaProducer() + go func() { + for err := range producer.Errors() { + if p.isLoggingEnabled { + log.Fatalln("Failed to produce message", err) + } + errHandler(err) + } + }() +} + +// Close attempts to close the producer, +// and returns any occurring errors over channel +func (p *Producer) Close() chan error { + // The error-channel only contains errors occurred + // while closing producer. Ignore if producer already + // closed. + if p.IsClosed() { + return nil + } + + closeErrorChan := make(chan error, 1) + go func() { + producer := *p.SaramaProducer() + err := producer.Close() + if err != nil { + if p.isLoggingEnabled { + log.Fatal("Error closing async producer.", err) + } + closeErrorChan <- err + } + if p.isLoggingEnabled { + log.Println("Async Producer closed.") + } + p.isClosed = true + }() + + return closeErrorChan +} diff --git a/producer/producer_test.go b/producer/producer_test.go new file mode 100644 index 0000000..85e0855 --- /dev/null +++ b/producer/producer_test.go @@ -0,0 +1,260 @@ +package producer + +import ( + "sync" + "testing" + "time" + + "github.com/TerrexTech/go-kafkautils/mocks" + + "github.com/Shopify/sarama" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" +) + +// TestProducer tests the critical Producer functions +func TestProducer(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "Producer Suite") +} + +func setupMockBroker() (*sarama.MockBroker, string) { + mockBroker := sarama.NewMockBroker(GinkgoT(), 0) + metadataResponse := new(sarama.MetadataResponse) + + metadataResponse.AddBroker(mockBroker.Addr(), mockBroker.BrokerID()) + metadataResponse.AddTopicPartition("test", 0, mockBroker.BrokerID(), nil, nil, sarama.ErrNoError) + mockBroker.Returns(metadataResponse) + + return mockBroker, mockBroker.Addr() +} + +func setupMockProducer(config *Config) (*Producer, *mocks.AsyncProducer) { + fakeProducer := mocks.AsyncProducer{ + ErrorsChan: make(chan *sarama.ProducerError), + InputChan: make(chan *sarama.ProducerMessage), + SuccessesChan: make(chan *sarama.ProducerMessage), + } + + asyncProducer, _ := New(config) + // Close the actual producer, since it will be replaced by a mock + asyncProducer.producer.Close() + asyncProducer.producer = &fakeProducer + asyncProducer.handleErrors(config.ErrHandler) + + return asyncProducer, &fakeProducer +} + +var _ = Describe("Producer", func() { + Context("new instance is requested", func() { + var ( + mockBroker *sarama.MockBroker + brokerAddr string + config *Config + errHandler func(*sarama.ProducerError) + + asyncProducer *Producer + err error + ) + + BeforeEach(func() { + // Setup a default basic producer-pipeline + mockBroker, brokerAddr = setupMockBroker() + + errHandler = func(err *sarama.ProducerError) { + Fail("Error occurred: " + err.Error()) + } + config = &Config{ + KafkaBrokers: []string{brokerAddr}, + ErrHandler: errHandler, + } + asyncProducer, err = New(config) + }) + AfterEach(func() { + asyncProducer.Close() + mockBroker.Close() + }) + + It("should return a new sarama-producer instance", func() { + Expect(*asyncProducer).To(BeAssignableToTypeOf(Producer{})) + Expect(err).To(BeNil()) + }) + + It("should return the error when initializing new producer fails", func() { + config = &Config{ + KafkaBrokers: []string{"invalid-broker"}, + ErrHandler: errHandler, + } + _, err := New(config) + Expect(err).To(HaveOccurred()) + }) + + It("should return error when no brokers are provided", func() { + config = &Config{ + ErrHandler: errHandler, + } + _, err := New(config) + Expect(err).To(HaveOccurred()) + }) + }) + + Context("input message is provided to producer", func() { + var ( + mockBroker *sarama.MockBroker + brokerAddr string + config *Config + errHandler func(*sarama.ProducerError) + + asyncProducer *Producer + fakeProducer *mocks.AsyncProducer + + isMsgProduced bool + ) + + BeforeEach(func() { + isMsgProduced = false + // Setup a default basic producer-pipeline + mockBroker, brokerAddr = setupMockBroker() + + errHandler = func(err *sarama.ProducerError) { + Fail("Error occurred: " + err.Error()) + } + config = &Config{ + KafkaBrokers: []string{brokerAddr}, + ErrHandler: errHandler, + } + asyncProducer, fakeProducer = setupMockProducer(config) + }) + AfterEach(func() { + asyncProducer.Close() + mockBroker.Close() + }) + + It("should return error if producer is closed", func() { + asyncProducer.Close() + // Allow producer to close + time.Sleep(5 * time.Millisecond) + + input, err := asyncProducer.Input() + Expect(input).To(BeNil()) + Expect(err).To(HaveOccurred()) + }) + + It("should produce the message", func() { + input, err := asyncProducer.Input() + Expect(input).ToNot(BeNil()) + Expect(err).ToNot(HaveOccurred()) + + // Read from mock-producer's Input channel to check + // if the message was produced + var wg sync.WaitGroup + wg.Add(1) + go func() { + for range fakeProducer.InputChan { + isMsgProduced = true + wg.Done() + } + }() + + // Produce the message + input <- fakeProducer.CreateMockMessage("test", "test", "test") + wg.Wait() + Expect(isMsgProduced).To(BeTrue()) + }) + }) + + Context("an error occurs while producing messages", func() { + var ( + mockBroker *sarama.MockBroker + brokerAddr string + config *Config + errHandler func(*sarama.ProducerError) + isErrHandlerCalled bool + + asyncProducer *Producer + fakeProducer *mocks.AsyncProducer + ) + + BeforeEach(func() { + // Setup a default basic producer-pipeline + mockBroker, brokerAddr = setupMockBroker() + + errHandler = func(_ *sarama.ProducerError) { + isErrHandlerCalled = true + } + config = &Config{ + KafkaBrokers: []string{brokerAddr}, + ErrHandler: errHandler, + } + asyncProducer, fakeProducer = setupMockProducer(config) + isErrHandlerCalled = false + }) + AfterEach(func() { + asyncProducer.Close() + mockBroker.Close() + }) + + It("should run the error-handler function", func() { + var wg sync.WaitGroup + wg.Add(1) + go func() { + fakeProducer.ErrorsChan <- fakeProducer.CreateMockError("test", "test", "test") + defer wg.Done() + }() + wg.Wait() + + // Wait for error-handler routines to complete + time.Sleep(5 * time.Millisecond) + Expect(isErrHandlerCalled).To(BeTrue()) + }) + }) + + Context("producer is requested to be closed", func() { + var ( + mockBroker *sarama.MockBroker + brokerAddr string + config Config + errHandler func(*sarama.ProducerError) + + asyncProducer *Producer + fakeProducer *mocks.AsyncProducer + ) + + BeforeEach(func() { + // Setup a default basic producer-pipeline + mockBroker, brokerAddr = setupMockBroker() + + errHandler = func(err *sarama.ProducerError) { + Fail("Error occurred: " + err.Error()) + } + config = Config{ + KafkaBrokers: []string{brokerAddr}, + ErrHandler: errHandler, + } + asyncProducer, fakeProducer = setupMockProducer(&config) + }) + AfterEach(func() { + asyncProducer.Close() + mockBroker.Close() + }) + + It("should return nil if already closed", func() { + asyncProducer.Close() + // Allow producer to close + time.Sleep(5 * time.Millisecond) + Expect(asyncProducer.Close()).To(BeNil()) + }) + + It("should close producer if its open", func() { + Expect(asyncProducer.IsClosed()).To(BeFalse()) + asyncProducer.Close() + + // Allow producer to close + time.Sleep(5 * time.Millisecond) + Expect(fakeProducer.IsClosed()).To(BeTrue()) + Expect(asyncProducer.IsClosed()).To(BeTrue()) + }) + }) + +})