@@ -10,13 +10,26 @@ import (
1010
1111 "github.com/akto-api-security/api-gateway-logging/trafficUtil/utils"
1212 "github.com/segmentio/kafka-go"
13+ "github.com/segmentio/kafka-go/sasl/plain"
1314)
1415
1516var kafkaWriter * kafka.Writer
1617var KafkaErrMsgCount = 0
1718var KafkaErrMsgEpoch = time .Now ()
1819var BytesInThreshold = 500 * 1024 * 1024
1920
21+ var isAuthImplemented = false
22+ var kafkaUsername = ""
23+ var kafkaPassword = ""
24+
25+ func init () {
26+
27+ utils .InitVar ("IS_AUTH_IMPLEMENTED" , & isAuthImplemented )
28+ utils .InitVar ("KAFKA_USERNAME" , & kafkaUsername )
29+ utils .InitVar ("KAFKA_PASSWORD" , & kafkaPassword )
30+
31+ }
32+
2033func InitKafka () {
2134 kafka_url := os .Getenv ("AKTO_KAFKA_BROKER_MAL" )
2235
@@ -126,7 +139,7 @@ func Produce(ctx context.Context, message string) error {
126139}
127140
128141func getKafkaWriter (kafkaURL , topic string , batchSize int , batchTimeout time.Duration ) * kafka.Writer {
129- return & kafka.Writer {
142+ kafkaWriter := kafka.Writer {
130143 Addr : kafka .TCP (kafkaURL ),
131144 Topic : topic ,
132145 BatchSize : batchSize ,
@@ -136,4 +149,16 @@ func getKafkaWriter(kafkaURL, topic string, batchSize int, batchTimeout time.Dur
136149 WriteTimeout : batchTimeout ,
137150 Async : true ,
138151 }
152+
153+ transport := & kafka.Transport {}
154+
155+ if isAuthImplemented && kafkaUsername != "" && kafkaPassword != "" {
156+ transport .SASL = plain.Mechanism {
157+ Username : kafkaUsername ,
158+ Password : kafkaPassword ,
159+ }
160+ }
161+
162+ kafkaWriter .Transport = transport
163+ return & kafkaWriter
139164}
0 commit comments