diff --git a/docs/index.md b/docs/index.md index dc95d35..1df01b4 100644 --- a/docs/index.md +++ b/docs/index.md @@ -42,7 +42,7 @@ provider "kafka" { ### Optional - `sasl` (Attributes) SASL Authentication (see [below for nested schema](#nestedatt--sasl)) -- `timeout` (Number) Timeout for provider operations (default: 300) +- `timeout` (Number) Timeout for provider operations in seconds (default: 300) - `tls` (Attributes) TLS Configuration (see [below for nested schema](#nestedatt--tls)) diff --git a/internal/provider/provider.go b/internal/provider/provider.go index e3049ad..b78b3e7 100644 --- a/internal/provider/provider.go +++ b/internal/provider/provider.go @@ -4,15 +4,18 @@ import ( "context" "fmt" "os" + "strconv" "strings" "time" "github.com/hashicorp/terraform-plugin-framework/datasource" "github.com/hashicorp/terraform-plugin-framework/diag" + "github.com/hashicorp/terraform-plugin-framework/path" "github.com/hashicorp/terraform-plugin-framework/provider" "github.com/hashicorp/terraform-plugin-framework/resource" "github.com/hashicorp/terraform-plugin-framework/tfsdk" "github.com/hashicorp/terraform-plugin-framework/types" + "github.com/hashicorp/terraform-plugin-log/tflog" "github.com/segmentio/topicctl/pkg/admin" ) @@ -134,14 +137,57 @@ func (p *KafkaProvider) Configure(ctx context.Context, req provider.ConfigureReq return } + envVarPrefix := strings.ToUpper(p.typeName) + + if len(config.BootstrapServers) >= 1 && config.BootstrapServers[0].IsUnknown() { + resp.Diagnostics.AddAttributeError( + path.Root("bootstrap_servers"), + "Unknown Kakfa bootstrap servers", + "The provider cannot create the Kafka client as there is an unknown configuration value. "+ + fmt.Sprintf("Either target apply the source of the value first, set the value statically in the configuration, or use the %s_BOOTSTRAP_SERVERS environment variable.", envVarPrefix), + ) + } + + if config.SASL.Username.IsUnknown() { + resp.Diagnostics.AddAttributeError( + path.Root("sasl.username"), + "Unknown Kafka SASL username", + "The provider cannot create the Kafka client as there is an unknown configuration value for the SASL username. "+ + fmt.Sprintf("Either target apply the source of the value first, set the value statically in the configuration, or use the %s_SASL_USERNAMAE environment variable.", envVarPrefix), + ) + } + if config.SASL.Password.IsUnknown() { + resp.Diagnostics.AddAttributeError( + path.Root("sasl.password"), + "Unknown Kafka SASL password", + "The provider cannot create the Kafka client as there is an unknown configuration value for the SASL password. "+ + fmt.Sprintf("Either target apply the source of the value first, set the value statically in the configuration, or use the %s_SASL_PASSWORD environment variable.", envVarPrefix), + ) + } + + if resp.Diagnostics.HasError() { + return + } + var brokerConfig admin.BrokerAdminClientConfig - // Configure TLS settings - brokerConfig.TLS.Enabled = config.TLS.Enabled.Value - brokerConfig.TLS.SkipVerify = config.TLS.SkipVerify.Value + // Bootstrap servers + bootstrapServersString := p.getEnv("BOOTSTRAP_SERVERS", "localhost:9092") + boostrapServers := strings.Split(bootstrapServersString, ",") + boostrapServer := boostrapServers[0] // Select the first server on the list + if len(config.BootstrapServers) > 0 { + boostrapServer = config.BootstrapServers[0].Value + } + // We only require 1 server + brokerConfig.BrokerAddr = boostrapServer + tflog.SetField(ctx, "kafka_bootstrap_server", boostrapServer) - // Configure SASL if enabled - if config.SASL.Enabled.Value { + // SASL configuration + saslConfigEnabled := p.getEnvBool("SASL_ENABLED", true) + if !config.SASL.Enabled.IsNull() { + saslConfigEnabled = config.SASL.Enabled.Value + } + if saslConfigEnabled { saslConfig, err := p.generateSASLConfig(ctx, config.SASL, resp) if err != nil { resp.Diagnostics.AddError("Unable to create Kafka client", err.Error()) @@ -150,14 +196,24 @@ func (p *KafkaProvider) Configure(ctx context.Context, req provider.ConfigureReq brokerConfig.SASL = saslConfig } - dafaultTimeout := time.Second * time.Duration(config.Timeout.Value) + // Configure TLS settings + brokerConfig.TLS.Enabled = config.TLS.Enabled.Value + brokerConfig.TLS.SkipVerify = config.TLS.SkipVerify.Value + // Configure timeout + defaultTimeout := int64(p.getEnvInt("TIMEOUT", 300)) + if !config.Timeout.IsNull() { + defaultTimeout = config.Timeout.Value + } + kafkaClientTimeout := time.Second * time.Duration(defaultTimeout) + + tflog.Debug(ctx, "Creating Kafka client") brokerConfig.ReadOnly = true dataSourceClient, err := admin.NewBrokerAdminClient( ctx, brokerConfig, ) - dataSourceClient.GetConnector().KafkaClient.Timeout = time.Duration(dafaultTimeout) + dataSourceClient.GetConnector().KafkaClient.Timeout = time.Duration(kafkaClientTimeout) if err != nil { resp.Diagnostics.AddError("Unable to create Kafka client", "An unexpected error occurred when creating the Kafka client "+ @@ -171,7 +227,7 @@ func (p *KafkaProvider) Configure(ctx context.Context, req provider.ConfigureReq ctx, brokerConfig, ) - resourceClient.GetConnector().KafkaClient.Timeout = time.Duration(dafaultTimeout) + resourceClient.GetConnector().KafkaClient.Timeout = time.Duration(kafkaClientTimeout) if err != nil { resp.Diagnostics.AddError("Unable to create Kafka client", "An unexpected error occurred when creating the Kafka client "+ @@ -179,31 +235,42 @@ func (p *KafkaProvider) Configure(ctx context.Context, req provider.ConfigureReq return } resp.ResourceData = resourceClient + tflog.Info(ctx, "Configured Kafka client", map[string]any{"success": true}) } // generateSASLConfig returns a SASLConfig{} or an error given a SASLModel func (p *KafkaProvider) generateSASLConfig(ctx context.Context, sasl SASLConfigModel, resp *provider.ConfigureResponse) (admin.SASLConfig, error) { + saslMechanism := p.getEnv("SASL_MECHANISM", "aws-msk-iam") if !sasl.Mechanism.IsNull() { saslMechanism = sasl.Mechanism.Value } + saslUsername := p.getEnv("SASL_USERNAME", "") + if !sasl.Mechanism.IsNull() { + saslUsername = sasl.Username.Value + } + saslPassword := p.getEnv("SASL_PASSWORD", "") + if !sasl.Mechanism.IsNull() { + saslPassword = sasl.Password.Value + } switch admin.SASLMechanism(saslMechanism) { case admin.SASLMechanismScramSHA512: case admin.SASLMechanismScramSHA256: case admin.SASLMechanismPlain: + tflog.SetField(ctx, "kafka_sasl_username", saslUsername) + tflog.SetField(ctx, "kafka_sasl_password", saslPassword) + tflog.MaskFieldValuesWithFieldKeys(ctx, "kafka_sasl_password") return admin.SASLConfig{ - Enabled: sasl.Enabled.Value, + Enabled: true, Mechanism: admin.SASLMechanismScramSHA256, - Username: sasl.Username.Value, - Password: sasl.Password.Value, + Username: saslUsername, + Password: saslPassword, }, nil case admin.SASLMechanismAWSMSKIAM: return admin.SASLConfig{ - Enabled: sasl.Enabled.Value, + Enabled: true, Mechanism: admin.SASLMechanismAWSMSKIAM, - Username: sasl.Username.Value, - Password: sasl.Password.Value, }, nil } return admin.SASLConfig{}, fmt.Errorf("unable to detect SASL mechanism: %s", sasl.Mechanism.Value) @@ -237,3 +304,29 @@ func (p *KafkaProvider) getEnv(key, fallback string) string { } return fallback } + +func (p *KafkaProvider) getEnvInt(key string, fallback int) int { + envVar := p.getEnv(key, "") + if envVar == "" { + return fallback + } + + result, err := strconv.Atoi(envVar) + if err != nil { + return fallback + } + return result +} + +func (p *KafkaProvider) getEnvBool(key string, fallback bool) bool { + envVar := p.getEnv(key, "") + if envVar == "" { + return fallback + } + + result, err := strconv.ParseBool(envVar) + if err != nil { + return fallback + } + return result +}