Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add boolean config to enable TLS #370

Merged
merged 2 commits into from
Sep 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions assets/docs/configuration/sources/kafka-full-example.hcl
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ source {
# The SASL Algorithm to use: "plaintext", "sha512" or "sha256" (default: "sha512")
sasl_algorithm = "sha256"

# Whether to enable TLS
enable_tls = true

# The optional certificate file for client authentication
cert_file = "myLocalhost.crt"

Expand Down
3 changes: 3 additions & 0 deletions assets/docs/configuration/targets/http-full-example.hcl
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ target {
# you could also reference an environment variable.
basic_auth_password = env.MY_AUTH_PASSWORD

# Whether to enable TLS
enable_tls = true

# The optional certificate file for client authentication
cert_file = "myLocalhost.crt"

Expand Down
3 changes: 3 additions & 0 deletions assets/docs/configuration/targets/kafka-full-example.hcl
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ target {
# The SASL Algorithm to use: "plaintext", "sha512" or "sha256" (default: "sha512")
sasl_algorithm = "sha256"

# Whether to enable TLS
enable_tls = true

# The optional certificate file for client authentication
cert_file = "myLocalhost.crt"

Expand Down
4 changes: 4 additions & 0 deletions config/component_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ func TestCreateTargetComponentHCL(t *testing.T) {
Headers: "",
BasicAuthUsername: "",
BasicAuthPassword: "",
EnableTLS: false,
CertFile: "",
KeyFile: "",
CaFile: "",
Expand Down Expand Up @@ -110,6 +111,7 @@ func TestCreateTargetComponentHCL(t *testing.T) {
OAuth2ClientSecret: "myClientSecret",
OAuth2RefreshToken: "myRefreshToken",
OAuth2TokenURL: "https://my.auth.server/token",
EnableTLS: true,
CertFile: "myLocalhost.crt",
KeyFile: "myLocalhost.key",
CaFile: "myRootCA.crt",
Expand Down Expand Up @@ -151,6 +153,7 @@ func TestCreateTargetComponentHCL(t *testing.T) {
SASLUsername: "",
SASLPassword: "",
SASLAlgorithm: "sha512",
EnableTLS: false,
CertFile: "",
KeyFile: "",
CaFile: "",
Expand All @@ -177,6 +180,7 @@ func TestCreateTargetComponentHCL(t *testing.T) {
SASLUsername: "mySaslUsername",
SASLPassword: "mySASLPassword",
SASLAlgorithm: "sha256",
EnableTLS: true,
CertFile: "myLocalhost.crt",
KeyFile: "myLocalhost.key",
CaFile: "myRootCA.crt",
Expand Down
11 changes: 6 additions & 5 deletions pkg/source/kafka/kafka_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ type Configuration struct {
SASLUsername string `hcl:"sasl_username,optional" `
SASLPassword string `hcl:"sasl_password,optional"`
SASLAlgorithm string `hcl:"sasl_algorithm,optional"`
EnableTLS bool `hcl:"enable_tls,optional"`
CertFile string `hcl:"cert_file,optional"`
KeyFile string `hcl:"key_file,optional"`
CaFile string `hcl:"ca_file,optional"`
Expand All @@ -50,7 +51,6 @@ type Configuration struct {

// kafkaSource holds a new client for reading messages from Apache Kafka
type kafkaSource struct {
config *sarama.Config
concurrentWrites int
topic string
brokers string
Expand Down Expand Up @@ -197,6 +197,7 @@ func (f adapter) ProvideDefault() (interface{}, error) {
Assignor: "range",
SASLAlgorithm: "sha512",
ConcurrentWrites: 15,
EnableTLS: false,
}

return cfg, nil
Expand Down Expand Up @@ -259,14 +260,14 @@ func newKafkaSource(cfg *Configuration) (*kafkaSource, error) {
}
}

// returns nil, nil if provided empty certs
tlsConfig, err := common.CreateTLSConfiguration(cfg.CertFile, cfg.KeyFile, cfg.CaFile, cfg.SkipVerifyTLS)
if err != nil {
return nil, err
}
if tlsConfig != nil {
saramaConfig.Net.TLS.Config = tlsConfig
saramaConfig.Net.TLS.Enable = true
}

saramaConfig.Net.TLS.Enable = cfg.EnableTLS
saramaConfig.Net.TLS.Config = tlsConfig
pondzix marked this conversation as resolved.
Show resolved Hide resolved

client, err := sarama.NewConsumerGroup(strings.Split(cfg.Brokers, ","), fmt.Sprintf(`%s-%s`, cfg.ConsumerName, cfg.TopicName), saramaConfig)
if err != nil {
Expand Down
18 changes: 12 additions & 6 deletions pkg/target/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,13 @@ type HTTPTargetConfig struct {
Headers string `hcl:"headers,optional"`
BasicAuthUsername string `hcl:"basic_auth_username,optional"`
BasicAuthPassword string `hcl:"basic_auth_password,optional"`
CertFile string `hcl:"cert_file,optional"`
KeyFile string `hcl:"key_file,optional"`
CaFile string `hcl:"ca_file,optional"`
SkipVerifyTLS bool `hcl:"skip_verify_tls,optional"` // false
DynamicHeaders bool `hcl:"dynamic_headers,optional"`

EnableTLS bool `hcl:"enable_tls,optional"`
CertFile string `hcl:"cert_file,optional"`
KeyFile string `hcl:"key_file,optional"`
CaFile string `hcl:"ca_file,optional"`
SkipVerifyTLS bool `hcl:"skip_verify_tls,optional"` // false
DynamicHeaders bool `hcl:"dynamic_headers,optional"`

OAuth2ClientID string `hcl:"oauth2_client_id,optional"`
OAuth2ClientSecret string `hcl:"oauth2_client_secret,optional"`
Expand Down Expand Up @@ -146,6 +148,7 @@ func newHTTPTarget(
headers string,
basicAuthUsername string,
basicAuthPassword string,
enableTLS bool,
certFile string,
keyFile string,
caFile string,
Expand All @@ -171,7 +174,8 @@ func newHTTPTarget(
if err2 != nil {
return nil, err2
}
if tlsConfig != nil {

if enableTLS && tlsConfig != nil {
transport.TLSClientConfig = tlsConfig
}

Expand Down Expand Up @@ -273,6 +277,7 @@ func HTTPTargetConfigFunction(c *HTTPTargetConfig) (*HTTPTarget, error) {
c.Headers,
c.BasicAuthUsername,
c.BasicAuthPassword,
c.EnableTLS,
c.CertFile,
c.KeyFile,
c.CaFile,
Expand Down Expand Up @@ -304,6 +309,7 @@ func (f HTTPTargetAdapter) ProvideDefault() (interface{}, error) {
RequestMaxMessages: 20,
RequestByteLimit: 1048576,
MessageByteLimit: 1048576,
EnableTLS: false,

RequestTimeoutInSeconds: 5,
ContentType: "application/json",
Expand Down
2 changes: 1 addition & 1 deletion pkg/target/http_oauth2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func runTest(t *testing.T, inputClientID string, inputClientSecret string, input
}

func oauth2Target(t *testing.T, targetURL string, inputClientID string, inputClientSecret string, inputRefreshToken string, tokenServerURL string) *HTTPTarget {
target, err := newHTTPTarget(targetURL, 5, 1, 1048576, 1048576, "application/json", "", "", "", "", "", "", true, false, inputClientID, inputClientSecret, inputRefreshToken, tokenServerURL, "", defaultResponseRules())
target, err := newHTTPTarget(targetURL, 5, 1, 1048576, 1048576, "application/json", "", "", "", false, "", "", "", true, false, inputClientID, inputClientSecret, inputRefreshToken, tokenServerURL, "", defaultResponseRules())
if err != nil {
t.Fatal(err)
}
Expand Down
29 changes: 16 additions & 13 deletions pkg/target/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,20 +308,20 @@ func TestHTTP_AddHeadersToRequest_WithDynamicHeaders(t *testing.T) {
func TestHTTP_NewHTTPTarget(t *testing.T) {
assert := assert.New(t)

httpTarget, err := newHTTPTarget("http://something", 5, 1, 1048576, 1048576, "application/json", "", "", "", "", "", "", true, false, "", "", "", "", "", defaultResponseRules())
httpTarget, err := newHTTPTarget("http://something", 5, 1, 1048576, 1048576, "application/json", "", "", "", false, "", "", "", true, false, "", "", "", "", "", defaultResponseRules())

assert.Nil(err)
assert.NotNil(httpTarget)

failedHTTPTarget, err1 := newHTTPTarget("something", 5, 1, 1048576, 1048576, "application/json", "", "", "", "", "", "", true, false, "", "", "", "", "", defaultResponseRules())
failedHTTPTarget, err1 := newHTTPTarget("something", 5, 1, 1048576, 1048576, "application/json", "", "", "", false, "", "", "", true, false, "", "", "", "", "", defaultResponseRules())

assert.NotNil(err1)
if err1 != nil {
assert.Equal("Invalid url for HTTP target: 'something'", err1.Error())
}
assert.Nil(failedHTTPTarget)

failedHTTPTarget2, err2 := newHTTPTarget("", 5, 1, 1048576, 1048576, "application/json", "", "", "", "", "", "", true, false, "", "", "", "", "", defaultResponseRules())
failedHTTPTarget2, err2 := newHTTPTarget("", 5, 1, 1048576, 1048576, "application/json", "", "", "", false, "", "", "", true, false, "", "", "", "", "", defaultResponseRules())
assert.NotNil(err2)
if err2 != nil {
assert.Equal("Invalid url for HTTP target: ''", err2.Error())
Expand Down Expand Up @@ -349,7 +349,7 @@ func TestHTTP_Write_Simple(t *testing.T) {
server := createTestServerWithResponseCode(&results, tt.ResponseCode, "")
defer server.Close()

target, err := newHTTPTarget(server.URL, 5, 1, 1048576, 1048576, "application/json", "", "", "", "", "", "", true, false, "", "", "", "", "", defaultResponseRules())
target, err := newHTTPTarget(server.URL, 5, 1, 1048576, 1048576, "application/json", "", "", "", false, "", "", "", true, false, "", "", "", "", "", defaultResponseRules())
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -413,7 +413,7 @@ func TestHTTP_Write_Batched(t *testing.T) {
server := createTestServerWithResponseCode(&results, 200, "")
defer server.Close()

target, err := newHTTPTarget(server.URL, 5, tt.BatchSize, 1048576, 1048576, "application/json", "", "", "", "", "", "", true, false, "", "", "", "", "", defaultResponseRules())
target, err := newHTTPTarget(server.URL, 5, tt.BatchSize, 1048576, 1048576, "application/json", "", "", "", false, "", "", "", true, false, "", "", "", "", "", defaultResponseRules())
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -472,7 +472,7 @@ func TestHTTP_Write_Concurrent(t *testing.T) {
server := createTestServer(&results)
defer server.Close()

target, err := newHTTPTarget(server.URL, 5, 1, 1048576, 1048576, "application/json", "", "", "", "", "", "", true, false, "", "", "", "", "", defaultResponseRules())
target, err := newHTTPTarget(server.URL, 5, 1, 1048576, 1048576, "application/json", "", "", "", false, "", "", "", true, false, "", "", "", "", "", defaultResponseRules())
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -516,7 +516,7 @@ func TestHTTP_Write_Failure(t *testing.T) {
server := createTestServer(&results)
defer server.Close()

target, err := newHTTPTarget("http://NonexistentEndpoint", 5, 1, 1048576, 1048576, "application/json", "", "", "", "", "", "", true, false, "", "", "", "", "", defaultResponseRules())
target, err := newHTTPTarget("http://NonexistentEndpoint", 5, 1, 1048576, 1048576, "application/json", "", "", "", false, "", "", "", true, false, "", "", "", "", "", defaultResponseRules())
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -558,7 +558,7 @@ func TestHTTP_Write_InvalidResponseCode(t *testing.T) {
var results [][]byte
server := createTestServerWithResponseCode(&results, tt.ResponseCode, "")
defer server.Close()
target, err := newHTTPTarget(server.URL, 5, 1, 1048576, 1048576, "application/json", "", "", "", "", "", "", true, false, "", "", "", "", "", defaultResponseRules())
target, err := newHTTPTarget(server.URL, 5, 1, 1048576, 1048576, "application/json", "", "", "", false, "", "", "", true, false, "", "", "", "", "", defaultResponseRules())
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -593,7 +593,7 @@ func TestHTTP_Write_Oversized(t *testing.T) {
server := createTestServer(&results)
defer server.Close()

target, err := newHTTPTarget(server.URL, 5, 1, 1048576, 1048576, "application/json", "", "", "", "", "", "", true, false, "", "", "", "", "", defaultResponseRules())
target, err := newHTTPTarget(server.URL, 5, 1, 1048576, 1048576, "application/json", "", "", "", false, "", "", "", true, false, "", "", "", "", "", defaultResponseRules())
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -636,7 +636,7 @@ func TestHTTP_Write_EnabledTemplating(t *testing.T) {
server := createTestServer(&results)
defer server.Close()

target, err := newHTTPTarget(server.URL, 5, 5, 1048576, 1048576, "application/json", "", "", "", "", "", "", true, false, "", "", "", "", string(`../../integration/http/template`), defaultResponseRules())
target, err := newHTTPTarget(server.URL, 5, 5, 1048576, 1048576, "application/json", "", "", "", false, "", "", "", true, false, "", "", "", "", string(`../../integration/http/template`), defaultResponseRules())
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -694,7 +694,7 @@ func TestHTTP_Write_Invalid(t *testing.T) {
},
}

target, err := newHTTPTarget(server.URL, 5, 5, 1048576, 1048576, "application/json", "", "", "", "", "", "", true, false, "", "", "", "", string(`../../integration/http/template`), &responseRules)
target, err := newHTTPTarget(server.URL, 5, 5, 1048576, 1048576, "application/json", "", "", "", false, "", "", "", true, false, "", "", "", "", string(`../../integration/http/template`), &responseRules)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -722,7 +722,7 @@ func TestHTTP_Write_Setup(t *testing.T) {
},
}

target, err := newHTTPTarget(server.URL, 5, 5, 1048576, 1048576, "application/json", "", "", "", "", "", "", true, false, "", "", "", "", string(`../../integration/http/template`), &responseRules)
target, err := newHTTPTarget(server.URL, 5, 5, 1048576, 1048576, "application/json", "", "", "", false, "", "", "", true, false, "", "", "", "", string(`../../integration/http/template`), &responseRules)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -757,6 +757,7 @@ func TestHTTP_Write_TLS(t *testing.T) {
"",
"",
"",
true,
string(`../../integration/http/localhost.crt`),
string(`../../integration/http/localhost.key`),
string(`../../integration/http/rootCA.crt`),
Expand Down Expand Up @@ -798,6 +799,7 @@ func TestHTTP_Write_TLS(t *testing.T) {
"",
"",
"",
true,
string(`../../integration/http/localhost.crt`),
string(`../../integration/http/localhost.key`),
string(`../../integration/http/rootCA.crt`),
Expand Down Expand Up @@ -832,6 +834,7 @@ func TestHTTP_Write_TLS(t *testing.T) {
"",
"",
"",
false,
"",
"",
"",
Expand Down Expand Up @@ -997,7 +1000,7 @@ func TestHTTP_Write_GroupedRequests(t *testing.T) {
defer server.Close()

//dynamicHeaders enabled
target, err := newHTTPTarget(server.URL, 5, 5, 1048576, 1048576, "application/json", "", "", "", "", "", "", true, true, "", "", "", "", "", defaultResponseRules())
target, err := newHTTPTarget(server.URL, 5, 5, 1048576, 1048576, "application/json", "", "", "", false, "", "", "", true, true, "", "", "", "", "", defaultResponseRules())
if err != nil {
t.Fatal(err)
}
Expand Down
9 changes: 5 additions & 4 deletions pkg/target/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ type KafkaConfig struct {
SASLUsername string `hcl:"sasl_username,optional"`
SASLPassword string `hcl:"sasl_password,optional"`
SASLAlgorithm string `hcl:"sasl_algorithm,optional"`
EnableTLS bool `hcl:"enable_tls,optional"`
CertFile string `hcl:"cert_file,optional"`
KeyFile string `hcl:"key_file,optional"`
CaFile string `hcl:"ca_file,optional"`
Expand Down Expand Up @@ -111,14 +112,13 @@ func NewKafkaTarget(cfg *KafkaConfig) (*KafkaTarget, error) {
}
}

// returns nil if certs are empty
tlsConfig, err := common.CreateTLSConfiguration(cfg.CertFile, cfg.KeyFile, cfg.CaFile, cfg.SkipVerifyTLS)
if err != nil {
return nil, err
}
if tlsConfig != nil {
saramaConfig.Net.TLS.Config = tlsConfig
saramaConfig.Net.TLS.Enable = true
}
saramaConfig.Net.TLS.Enable = cfg.EnableTLS
saramaConfig.Net.TLS.Config = tlsConfig

var asyncResults chan *saramaResult = nil
var asyncProducer sarama.AsyncProducer = nil
Expand Down Expand Up @@ -187,6 +187,7 @@ func (f KafkaTargetAdapter) ProvideDefault() (interface{}, error) {
MaxRetries: 10,
ByteLimit: 1048576,
SASLAlgorithm: "sha512",
EnableTLS: false,
}

return cfg, nil
Expand Down
Loading