diff --git a/plugins/extractors/kafka/kafka.go b/plugins/extractors/kafka/kafka.go index cd0f5ff4..ed3c0570 100644 --- a/plugins/extractors/kafka/kafka.go +++ b/plugins/extractors/kafka/kafka.go @@ -11,6 +11,7 @@ import ( "strings" "time" + "github.com/IBM/sarama" "github.com/raystack/meteor/models" v1beta2 "github.com/raystack/meteor/models/raystack/assets/v1beta2" "github.com/raystack/meteor/plugins" @@ -110,7 +111,6 @@ func (e *Extractor) Init(ctx context.Context, config plugins.Config) error { } consumerConfig := sarama.NewConfig() - if e.config.Auth.TLS.Enabled { tlsConfig, err := e.createTLSConfig() if err != nil { @@ -118,13 +118,14 @@ func (e *Extractor) Init(ctx context.Context, config plugins.Config) error { } consumerConfig.Net.TLS.Enable = true consumerConfig.Net.TLS.Config = tlsConfig + } - if e.config.Auth.SASL.Enabled { - consumerConfig.Net.SASL.Enable = true - if e.config.Auth.SASL.Mechanism == sarama.SASLTypeOAuth { - consumerConfig.Net.SASL.Mechanism = sarama.SASLTypeOAuth - consumerConfig.Net.SASL.TokenProvider = NewKubernetesTokenProvider() - } + if e.config.Auth.SASL.Enabled { + consumerConfig.Net.SASL.Enable = true + if e.config.Auth.SASL.Mechanism == sarama.SASLTypeOAuth { + consumerConfig.Net.SASL.Mechanism = sarama.SASLTypeOAuth + consumerConfig.Net.SASL.TokenProvider = NewKubernetesTokenProvider() + } } consumer, err := sarama.NewConsumer([]string{e.config.Broker}, consumerConfig) @@ -133,6 +134,7 @@ func (e *Extractor) Init(ctx context.Context, config plugins.Config) error { return fmt.Errorf("failed to create kafka consumer for brokers %s and config %+v. Error %s", e.config.Broker, consumerConfig, err.Error()) } + e.conn = consumer return nil } @@ -162,6 +164,7 @@ func (e *Extractor) Extract(ctx context.Context, emit plugins.Emit) (err error) ctx, time.Since(start).Milliseconds(), metric.WithAttributes(attributes...), ) }(time.Now()) + topics, err := e.conn.Topics() if err != nil { return fmt.Errorf("fetch topics: %w", err) @@ -200,11 +203,6 @@ func (e *Extractor) createTLSConfig() (*tls.Config, error) { }, nil } - cert, err := tls.LoadX509KeyPair(authConfig.CertFile, authConfig.KeyFile) - if err != nil { - return nil, fmt.Errorf("create cert: %w", err) - } - var cert tls.Certificate var err error if authConfig.CertFile != "" && authConfig.KeyFile != "" { @@ -214,6 +212,11 @@ func (e *Extractor) createTLSConfig() (*tls.Config, error) { } } + caCert, err := os.ReadFile(authConfig.CAFile) + if err != nil { + return nil, fmt.Errorf("read ca cert file: %w", err) + } + caCertPool := x509.NewCertPool() caCertPool.AppendCertsFromPEM(caCert) @@ -231,7 +234,7 @@ func (e *Extractor) buildAsset(topicName string, numOfPartitions int) (*v1beta2. Profile: &v1beta2.TopicProfile{ NumberOfPartitions: int64(numOfPartitions), }, - Attributes: &structpb.Struct{}, + Attributes: &structpb.Struct{}, // ensure attributes don't get overwritten if present }) if err != nil { e.logger.Warn("error creating Any struct", "error", err) diff --git a/plugins/extractors/kafka/kafka_test.go b/plugins/extractors/kafka/kafka_test.go index f0f437bf..6abc643e 100644 --- a/plugins/extractors/kafka/kafka_test.go +++ b/plugins/extractors/kafka/kafka_test.go @@ -35,7 +35,6 @@ var ( func TestMain(m *testing.M) { var broker *kafkaLib.Broker - // setup test opts := dockertest.RunOptions{ Repository: "moeenz/docker-kafka-kraft", @@ -55,6 +54,7 @@ func TestMain(m *testing.M) { time.Sleep(30 * time.Second) conn, err := kafkaLib.NewClient([]string{brokerHost}, nil) if err != nil { + fmt.Printf("error creating client ") return } @@ -70,9 +70,9 @@ func TestMain(m *testing.M) { conn.Close() return } - return } + purgeContainer, err := utils.CreateContainer(opts, retryFn) if err != nil { log.Fatal(err) @@ -86,8 +86,6 @@ func TestMain(m *testing.M) { // run tests code := m.Run() - conn.Close() - // purge container if err := purgeContainer(); err != nil { log.Fatal(err) @@ -179,7 +177,6 @@ func TestExtract(t *testing.T) { if err != nil { t.Fatal(err) } - emitter := mocks.NewEmitter() err = extr.Extract(ctx, emitter.Push) assert.NoError(t, err) @@ -226,12 +223,14 @@ func TestExtract(t *testing.T) { } func setup(broker *kafkaLib.Broker) (err error) { + // create client connection to create topics conn, err := kafkaLib.NewClient([]string{brokerHost}, nil) if err != nil { fmt.Printf("error creating client ") return } + defer conn.Close() // create topics @@ -241,6 +240,7 @@ func setup(broker *kafkaLib.Broker) (err error) { "meteor-test-topic-3": {NumPartitions: 1, ReplicationFactor: 1}, "__consumer_offsets": {NumPartitions: 1, ReplicationFactor: 1}, } + createTopicRequest := &kafkaLib.CreateTopicsRequest{TopicDetails: topicConfigs} _, err = broker.CreateTopics(createTopicRequest) if err != nil {