Skip to content

Commit 4c47a89

Browse files
authored
Merge pull request #3 from TerrexTech/v2.0.0-channels
Replace callbacks with channels, refactoring
2 parents d90cc17 + 46711c9 commit 4c47a89

File tree

5 files changed

+78
-222
lines changed

5 files changed

+78
-222
lines changed

README.md

Lines changed: 27 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -26,61 +26,56 @@ import github.com/TerrexTech/go-kafkautils/producer // Import Producer
2626
#### Consumer:
2727

2828
```Go
29-
msgHandler := func(msg *sarama.ConsumerMessage, c *consumer.Consumer) {
30-
// Convert from []byte to string
31-
println("Received message: ", string(msg.Value))
32-
consumer := c.Get()
33-
if !c.IsClosed() {
34-
consumer.MarkOffset(msg, "")
35-
} else {
36-
log.Fatalln("Consumer was closed before offsets could be marked.")
37-
}
38-
}
39-
40-
errHandler := func(e *error) {
41-
log.Fatalln((*e).Error())
42-
}
43-
4429
config := consumer.Config{
4530
ConsumerGroup: "test",
46-
ErrHandler: errHandler,
4731
KafkaBrokers: []string{"localhost:9092"},
48-
MsgHandler: msgHandler,
4932
Topics: []string{"test"},
5033
}
5134

52-
proxyconsumer, _ := consumer.New(&config)
35+
proxyconsumer, err := consumer.New(&config)
36+
if err != nil {
37+
panic(err)
38+
}
5339
proxyconsumer.EnableLogging()
5440

55-
// Temporary hack for simplicity. Use channels/app-logic in actual application.
56-
time.Sleep(100000 * time.Millisecond)
41+
// Read Errors
42+
go func() {
43+
for err := proxyConsumer.Errors() {
44+
log.Println(err)
45+
}
46+
}()
47+
48+
// Read Messages
49+
go func() {
50+
for msg := proxyConsumer.Messages() {
51+
log.Println(msg)
52+
}
53+
}()
5754
```
5855

5956
#### Producer:
6057

6158
```Go
62-
errHandler := func(err *sarama.ProducerError) {
63-
errs := *err
64-
fmt.Println(errs.Error())
65-
}
6659
config := producer.Config{
67-
ErrHandler: errHandler,
6860
KafkaBrokers: []string{"localhost:9092"},
6961
}
7062
asyncProducer, err := producer.New(&config)
63+
if err != nil {
64+
panic(err)
65+
}
7166
asyncProducer.EnableLogging()
7267

68+
go func() {
69+
for err := asyncProducer.Errors() {
70+
log.Println(err)
71+
}
72+
}()
73+
7374
strTime := strconv.Itoa(int(time.Now().Unix()))
7475
msg := asyncProducer.CreateKeyMessage("test", strTime, "testValue")
7576

76-
if err != nil {
77-
panic(err)
78-
}
7977
input, _ := asyncProducer.Input()
80-
input <- msg
81-
82-
// Temporary hack for simplicity. Use channels/app-logic in actual application.
83-
time.Sleep(2000 * time.Millisecond)
78+
input <- msg // Produce message
8479
```
8580

8681
[0]: https://github.com/Shopify/sarama

consumer/consumer.go

Lines changed: 12 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -33,10 +33,7 @@ type Adapter interface {
3333
// Config wraps configuration for consumer
3434
type Config struct {
3535
ConsumerGroup string
36-
ErrHandler func(*error)
3736
KafkaBrokers []string
38-
MsgHandler func(*sarama.ConsumerMessage, *Consumer)
39-
NtfnHandler func(*cluster.Notification)
4037
// Allow overwriting default sarama-config
4138
SaramaConfig *cluster.Config
4239
Topics []string
@@ -49,11 +46,9 @@ type Consumer struct {
4946
isLoggingEnabled bool
5047
}
5148

52-
// To facilitate testing. This var gets overwritten by custon
53-
// init function.
54-
// We don't pass the init function as argument or
55-
// via dependency-injection because the purpose of
56-
// this library is to highly abstract the kafka configs
49+
// To facilitate testing. This var gets overwritten by custom init function.
50+
// We don't pass the init function as argument or via dependency-injection
51+
// because the purpose of this library is to abstract the kafka configs.
5752
var initFunc func([]string, string, []string, *cluster.Config) (*cluster.Consumer, error)
5853

5954
func init() {
@@ -75,7 +70,6 @@ func New(initConfig *Config) (*Consumer, error) {
7570
config.Consumer.Offsets.Initial = sarama.OffsetNewest
7671
config.Consumer.MaxProcessingTime = 10 * time.Second
7772
config.Consumer.Return.Errors = true
78-
config.Group.Return.Notifications = true
7973
}
8074

8175
consumer, err := initFunc(initConfig.KafkaBrokers, initConfig.ConsumerGroup, initConfig.Topics, config)
@@ -91,16 +85,12 @@ func New(initConfig *Config) (*Consumer, error) {
9185
isLoggingEnabled: false,
9286
}
9387

94-
// Don't run these functions when mocking consumer,
95-
// where initial consumer is nil.
88+
// Don't run this function when mocking consumer, where
89+
// initial consumer is nil.
9690
// This initialization is controlled by mock consumer.
9791
if consumer != nil {
9892
proxyConsumer.handleKeyInterrupt()
99-
proxyConsumer.handleErrors(initConfig.ErrHandler)
100-
proxyConsumer.handleMessages(initConfig.MsgHandler)
101-
proxyConsumer.handleNotifications(initConfig.NtfnHandler)
10293
}
103-
// log.Println("Consumer waiting for messages.")
10494
return &proxyConsumer, nil
10595
}
10696

@@ -130,51 +120,20 @@ func (c *Consumer) handleKeyInterrupt() {
130120
// Elegant exit
131121
go func() {
132122
<-sigChan
133-
log.Println("Keyboard-Interrupt signal received.")
123+
log.Println("Keyboard-Interrupt signal received, cleaning up before closing")
134124
closeError := <-c.Close()
135125
log.Println(closeError)
136126
}()
137127
}
138128

139-
func (c *Consumer) handleErrors(errHandler func(*error)) {
140-
consumer := c.SaramaConsumerGroup()
141-
go func() {
142-
for err := range consumer.Errors() {
143-
if c.isLoggingEnabled {
144-
log.Fatalln("Failed to read messages from topic:", err)
145-
}
146-
if errHandler != nil {
147-
errHandler(&err)
148-
}
149-
}
150-
}()
129+
// Errors returns the error-channel for Consumer.
130+
func (c *Consumer) Errors() <-chan error {
131+
return c.consumer.Errors()
151132
}
152133

153-
func (c *Consumer) handleMessages(msgHandler func(*sarama.ConsumerMessage, *Consumer)) {
154-
consumer := c.SaramaConsumerGroup()
155-
go func() {
156-
for message := range consumer.Messages() {
157-
if c.isLoggingEnabled {
158-
log.Printf("Topic: %s\t Partition: %v\t Offset: %v\n", message.Topic, message.Partition, message.Offset)
159-
}
160-
msgHandler(message, c)
161-
}
162-
}()
163-
}
164-
165-
// Consumer-Rebalancing notifications
166-
func (c *Consumer) handleNotifications(ntfnHandler func(*cluster.Notification)) {
167-
consumer := c.SaramaConsumerGroup()
168-
go func() {
169-
for ntf := range consumer.Notifications() {
170-
if c.isLoggingEnabled {
171-
log.Printf("Rebalanced: %+v\n", ntf)
172-
}
173-
if ntfnHandler != nil {
174-
ntfnHandler(ntf)
175-
}
176-
}
177-
}()
134+
// Messages returns the messages-channel for Consumer.
135+
func (c *Consumer) Messages() <-chan *sarama.ConsumerMessage {
136+
return c.consumer.Messages()
178137
}
179138

180139
// Close attempts to close the consumer,

0 commit comments

Comments
 (0)