Skip to content

Commit

Permalink
Improve provider generation and enable env-var configuration
Browse files Browse the repository at this point in the history
  • Loading branch information
pecigonzalo committed Oct 24, 2022
1 parent e4bb594 commit 0074ba1
Show file tree
Hide file tree
Showing 2 changed files with 108 additions and 15 deletions.
2 changes: 1 addition & 1 deletion docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ provider "kafka" {
### Optional

- `sasl` (Attributes) SASL Authentication (see [below for nested schema](#nestedatt--sasl))
- `timeout` (Number) Timeout for provider operations (default: 300)
- `timeout` (Number) Timeout for provider operations in seconds (default: 300)
- `tls` (Attributes) TLS Configuration (see [below for nested schema](#nestedatt--tls))

<a id="nestedatt--sasl"></a>
Expand Down
121 changes: 107 additions & 14 deletions internal/provider/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,18 @@ import (
"context"
"fmt"
"os"
"strconv"
"strings"
"time"

"github.com/hashicorp/terraform-plugin-framework/datasource"
"github.com/hashicorp/terraform-plugin-framework/diag"
"github.com/hashicorp/terraform-plugin-framework/path"
"github.com/hashicorp/terraform-plugin-framework/provider"
"github.com/hashicorp/terraform-plugin-framework/resource"
"github.com/hashicorp/terraform-plugin-framework/tfsdk"
"github.com/hashicorp/terraform-plugin-framework/types"
"github.com/hashicorp/terraform-plugin-log/tflog"
"github.com/segmentio/topicctl/pkg/admin"
)

Expand Down Expand Up @@ -134,14 +137,57 @@ func (p *KafkaProvider) Configure(ctx context.Context, req provider.ConfigureReq
return
}

envVarPrefix := strings.ToUpper(p.typeName)

if len(config.BootstrapServers) >= 1 && config.BootstrapServers[0].IsUnknown() {
resp.Diagnostics.AddAttributeError(
path.Root("bootstrap_servers"),
"Unknown Kakfa bootstrap servers",
"The provider cannot create the Kafka client as there is an unknown configuration value. "+
fmt.Sprintf("Either target apply the source of the value first, set the value statically in the configuration, or use the %s_BOOTSTRAP_SERVERS environment variable.", envVarPrefix),
)
}

if config.SASL.Username.IsUnknown() {
resp.Diagnostics.AddAttributeError(
path.Root("sasl.username"),
"Unknown Kafka SASL username",
"The provider cannot create the Kafka client as there is an unknown configuration value for the SASL username. "+
fmt.Sprintf("Either target apply the source of the value first, set the value statically in the configuration, or use the %s_SASL_USERNAMAE environment variable.", envVarPrefix),
)
}
if config.SASL.Password.IsUnknown() {
resp.Diagnostics.AddAttributeError(
path.Root("sasl.password"),
"Unknown Kafka SASL password",
"The provider cannot create the Kafka client as there is an unknown configuration value for the SASL password. "+
fmt.Sprintf("Either target apply the source of the value first, set the value statically in the configuration, or use the %s_SASL_PASSWORD environment variable.", envVarPrefix),
)
}

if resp.Diagnostics.HasError() {
return
}

var brokerConfig admin.BrokerAdminClientConfig

// Configure TLS settings
brokerConfig.TLS.Enabled = config.TLS.Enabled.Value
brokerConfig.TLS.SkipVerify = config.TLS.SkipVerify.Value
// Bootstrap servers
bootstrapServersString := p.getEnv("BOOTSTRAP_SERVERS", "localhost:9092")
boostrapServers := strings.Split(bootstrapServersString, ",")
boostrapServer := boostrapServers[0] // Select the first server on the list
if len(config.BootstrapServers) > 0 {
boostrapServer = config.BootstrapServers[0].Value
}
// We only require 1 server
brokerConfig.BrokerAddr = boostrapServer
tflog.SetField(ctx, "kafka_bootstrap_server", boostrapServer)

// Configure SASL if enabled
if config.SASL.Enabled.Value {
// SASL configuration
saslConfigEnabled := p.getEnvBool("SASL_ENABLED", true)
if !config.SASL.Enabled.IsNull() {
saslConfigEnabled = config.SASL.Enabled.Value
}
if saslConfigEnabled {
saslConfig, err := p.generateSASLConfig(ctx, config.SASL, resp)
if err != nil {
resp.Diagnostics.AddError("Unable to create Kafka client", err.Error())
Expand All @@ -150,14 +196,24 @@ func (p *KafkaProvider) Configure(ctx context.Context, req provider.ConfigureReq
brokerConfig.SASL = saslConfig
}

dafaultTimeout := time.Second * time.Duration(config.Timeout.Value)
// Configure TLS settings
brokerConfig.TLS.Enabled = config.TLS.Enabled.Value
brokerConfig.TLS.SkipVerify = config.TLS.SkipVerify.Value

// Configure timeout
defaultTimeout := int64(p.getEnvInt("TIMEOUT", 300))
if !config.Timeout.IsNull() {
defaultTimeout = config.Timeout.Value
}
kafkaClientTimeout := time.Second * time.Duration(defaultTimeout)

tflog.Debug(ctx, "Creating Kafka client")
brokerConfig.ReadOnly = true
dataSourceClient, err := admin.NewBrokerAdminClient(
ctx,
brokerConfig,
)
dataSourceClient.GetConnector().KafkaClient.Timeout = time.Duration(dafaultTimeout)
dataSourceClient.GetConnector().KafkaClient.Timeout = time.Duration(kafkaClientTimeout)
if err != nil {
resp.Diagnostics.AddError("Unable to create Kafka client",
"An unexpected error occurred when creating the Kafka client "+
Expand All @@ -171,39 +227,50 @@ func (p *KafkaProvider) Configure(ctx context.Context, req provider.ConfigureReq
ctx,
brokerConfig,
)
resourceClient.GetConnector().KafkaClient.Timeout = time.Duration(dafaultTimeout)
resourceClient.GetConnector().KafkaClient.Timeout = time.Duration(kafkaClientTimeout)
if err != nil {
resp.Diagnostics.AddError("Unable to create Kafka client",
"An unexpected error occurred when creating the Kafka client "+
"Kafka Error: "+err.Error())
return
}
resp.ResourceData = resourceClient
tflog.Info(ctx, "Configured Kafka client", map[string]any{"success": true})
}

// generateSASLConfig returns a SASLConfig{} or an error given a SASLModel
func (p *KafkaProvider) generateSASLConfig(ctx context.Context, sasl SASLConfigModel, resp *provider.ConfigureResponse) (admin.SASLConfig, error) {

saslMechanism := p.getEnv("SASL_MECHANISM", "aws-msk-iam")
if !sasl.Mechanism.IsNull() {
saslMechanism = sasl.Mechanism.Value
}
saslUsername := p.getEnv("SASL_USERNAME", "")
if !sasl.Mechanism.IsNull() {
saslUsername = sasl.Username.Value
}
saslPassword := p.getEnv("SASL_PASSWORD", "")
if !sasl.Mechanism.IsNull() {
saslPassword = sasl.Password.Value
}

switch admin.SASLMechanism(saslMechanism) {
case admin.SASLMechanismScramSHA512:
case admin.SASLMechanismScramSHA256:
case admin.SASLMechanismPlain:
tflog.SetField(ctx, "kafka_sasl_username", saslUsername)
tflog.SetField(ctx, "kafka_sasl_password", saslPassword)
tflog.MaskFieldValuesWithFieldKeys(ctx, "kafka_sasl_password")
return admin.SASLConfig{
Enabled: sasl.Enabled.Value,
Enabled: true,
Mechanism: admin.SASLMechanismScramSHA256,
Username: sasl.Username.Value,
Password: sasl.Password.Value,
Username: saslUsername,
Password: saslPassword,
}, nil
case admin.SASLMechanismAWSMSKIAM:
return admin.SASLConfig{
Enabled: sasl.Enabled.Value,
Enabled: true,
Mechanism: admin.SASLMechanismAWSMSKIAM,
Username: sasl.Username.Value,
Password: sasl.Password.Value,
}, nil
}
return admin.SASLConfig{}, fmt.Errorf("unable to detect SASL mechanism: %s", sasl.Mechanism.Value)
Expand Down Expand Up @@ -237,3 +304,29 @@ func (p *KafkaProvider) getEnv(key, fallback string) string {
}
return fallback
}

func (p *KafkaProvider) getEnvInt(key string, fallback int) int {
envVar := p.getEnv(key, "")
if envVar == "" {
return fallback
}

result, err := strconv.Atoi(envVar)
if err != nil {
return fallback
}
return result
}

func (p *KafkaProvider) getEnvBool(key string, fallback bool) bool {
envVar := p.getEnv(key, "")
if envVar == "" {
return fallback
}

result, err := strconv.ParseBool(envVar)
if err != nil {
return fallback
}
return result
}

0 comments on commit 0074ba1

Please sign in to comment.