diff --git a/trafficUtil/kafkaUtil/kafka.go b/trafficUtil/kafkaUtil/kafka.go index 93e872d..acc6e77 100644 --- a/trafficUtil/kafkaUtil/kafka.go +++ b/trafficUtil/kafkaUtil/kafka.go @@ -17,6 +17,7 @@ import ( "github.com/akto-api-security/mirroring-api-logging/trafficUtil/utils" "github.com/segmentio/kafka-go" + "github.com/segmentio/kafka-go/sasl/plain" "google.golang.org/protobuf/proto" ) @@ -29,12 +30,20 @@ var useTLS = false var InsecureSkipVerify = true var tlsCACertPath = "./ca.crt" +var isAuthImplemented = false +var kafkaUsername = "" +var kafkaPassword = "" + func init() { utils.InitVar("USE_TLS", &useTLS) utils.InitVar("INSECURE_SKIP_VERIFY", &InsecureSkipVerify) utils.InitVar("TLS_CA_CERT_PATH", &tlsCACertPath) + utils.InitVar("IS_AUTH_IMPLEMENTED", &isAuthImplemented) + utils.InitVar("KAFKA_USERNAME", &kafkaUsername) + utils.InitVar("KAFKA_PASSWORD", &kafkaPassword) + } func InitKafka() { @@ -306,11 +315,22 @@ func getKafkaWriter(kafkaURL string, batchSize int, batchTimeout time.Duration) Compression: kafka.Lz4, } + transport := &kafka.Transport{} + if useTLS { tlsConfig, _ := NewTLSConfig(tlsCACertPath) - kafkaWriter.Transport = &kafka.Transport{ - TLS: tlsConfig, + transport.TLS = tlsConfig + } + + // Add SASL authentication if enabled + if isAuthImplemented && kafkaUsername != "" && kafkaPassword != "" { + slog.Info("Configuring SASL plain authentication", "username", kafkaUsername) + transport.SASL = plain.Mechanism{ + Username: kafkaUsername, + Password: kafkaPassword, } } + + kafkaWriter.Transport = transport return &kafkaWriter }