@@ -17,6 +17,7 @@ import (
1717 "github.com/akto-api-security/mirroring-api-logging/trafficUtil/utils"
1818
1919 "github.com/segmentio/kafka-go"
20+ "github.com/segmentio/kafka-go/sasl/plain"
2021 "google.golang.org/protobuf/proto"
2122)
2223
@@ -29,12 +30,20 @@ var useTLS = false
2930var InsecureSkipVerify = true
3031var tlsCACertPath = "./ca.crt"
3132
33+ var isAuthImplemented = false
34+ var kafkaUsername = ""
35+ var kafkaPassword = ""
36+
3237func init () {
3338
3439 utils .InitVar ("USE_TLS" , & useTLS )
3540 utils .InitVar ("INSECURE_SKIP_VERIFY" , & InsecureSkipVerify )
3641 utils .InitVar ("TLS_CA_CERT_PATH" , & tlsCACertPath )
3742
43+ utils .InitVar ("IS_AUTH_IMPLEMENTED" , & isAuthImplemented )
44+ utils .InitVar ("KAFKA_USERNAME" , & kafkaUsername )
45+ utils .InitVar ("KAFKA_PASSWORD" , & kafkaPassword )
46+
3847}
3948
4049func InitKafka () {
@@ -306,11 +315,22 @@ func getKafkaWriter(kafkaURL string, batchSize int, batchTimeout time.Duration)
306315 Compression : kafka .Lz4 ,
307316 }
308317
318+ transport := & kafka.Transport {}
319+
309320 if useTLS {
310321 tlsConfig , _ := NewTLSConfig (tlsCACertPath )
311- kafkaWriter .Transport = & kafka.Transport {
312- TLS : tlsConfig ,
322+ transport .TLS = tlsConfig
323+ }
324+
325+ // Add SASL authentication if enabled
326+ if isAuthImplemented && kafkaUsername != "" && kafkaPassword != "" {
327+ slog .Info ("Configuring SASL plain authentication" , "username" , kafkaUsername )
328+ transport .SASL = plain.Mechanism {
329+ Username : kafkaUsername ,
330+ Password : kafkaPassword ,
313331 }
314332 }
333+
334+ kafkaWriter .Transport = transport
315335 return & kafkaWriter
316336}
0 commit comments