-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathkafka.go
79 lines (68 loc) · 1.85 KB
/
kafka.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
package main
import (
"crypto/md5"
"encoding/hex"
"io"
"os"
"os/signal"
"time"
cluster "github.com/bsm/sarama-cluster"
log "github.com/thinkboy/log4go"
)
const (
OFFSETS_PROCESSING_TIMEOUT_SECONDS = 10 * time.Second
OFFSETS_COMMIT_INTERVAL = 10 * time.Second
)
func md5password(id string, secret string) (password string) {
h := md5.New()
io.WriteString(h, secret)
smd5 := hex.EncodeToString(h.Sum(nil))
h1 := md5.New()
io.WriteString(h1, id+string(smd5[:]))
password = hex.EncodeToString(h1.Sum(nil))
return password
}
func InitKafka() error {
log.Info("start topic:%s consumer", Conf.KafkaTopic)
log.Info("consumer group name:%s", Conf.Group)
// init consumer
config := cluster.NewConfig()
config.Consumer.Return.Errors = true
config.Group.Return.Notifications = true
// for production please uncomment blows
config.Net.SASL.Enable = Conf.SaslEnable
config.Net.SASL.User = Conf.SaslUser
config.Net.SASL.Password = md5password(Conf.SaslUser, Conf.SaslPassword)
brokers := Conf.KafkaAddrs
topics := []string{Conf.KafkaTopic}
consumer, err := cluster.NewConsumer(brokers, Conf.Group, topics, config)
if err != nil {
panic(err)
}
defer consumer.Close()
// trap SIGINT to trigger a shutdown.
signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt)
// consume messages, watch errors and notifications
for {
select {
case msg, more := <-consumer.Messages():
if more {
//fmt.Fprintf(os.Stdout, "%s/%d/%d\t%s\t%s\n", msg.Topic, msg.Partition, msg.Offset, msg.Key, msg.Value)
push(msg.Value)
consumer.MarkOffset(msg, "") // mark message as processed
}
case err, more := <-consumer.Errors():
if more {
log.Error("Error: %s\n", err.Error())
}
case ntf, more := <-consumer.Notifications():
if more {
log.Info("Info: %+v\n", ntf)
}
case <-signals:
return nil
}
}
return nil
}