From babacd994b00ca9c75d2a251ae79db7d19baa9ac Mon Sep 17 00:00:00 2001 From: colmsnowplow Date: Tue, 17 Sep 2024 17:52:21 +0100 Subject: [PATCH 1/2] Add boolean config to enable TLS --- .../sources/kafka-full-example.hcl | 3 ++ .../targets/http-full-example.hcl | 3 ++ .../targets/kafka-full-example.hcl | 3 ++ pkg/source/kafka/kafka_source.go | 11 +++---- pkg/target/http.go | 18 ++++++++---- pkg/target/http_oauth2_test.go | 2 +- pkg/target/http_test.go | 29 ++++++++++--------- pkg/target/kafka.go | 9 +++--- 8 files changed, 49 insertions(+), 29 deletions(-) diff --git a/assets/docs/configuration/sources/kafka-full-example.hcl b/assets/docs/configuration/sources/kafka-full-example.hcl index 00ce3cfb..f13ef823 100644 --- a/assets/docs/configuration/sources/kafka-full-example.hcl +++ b/assets/docs/configuration/sources/kafka-full-example.hcl @@ -34,6 +34,9 @@ source { # The SASL Algorithm to use: "plaintext", "sha512" or "sha256" (default: "sha512") sasl_algorithm = "sha256" + # Whether to enable TLS + enable_tls = true + # The optional certificate file for client authentication cert_file = "myLocalhost.crt" diff --git a/assets/docs/configuration/targets/http-full-example.hcl b/assets/docs/configuration/targets/http-full-example.hcl index f2b4e2b4..9a8f8a64 100644 --- a/assets/docs/configuration/targets/http-full-example.hcl +++ b/assets/docs/configuration/targets/http-full-example.hcl @@ -32,6 +32,9 @@ target { # you could also reference an environment variable. basic_auth_password = env.MY_AUTH_PASSWORD + # Whether to enable TLS + enable_tls = true + # The optional certificate file for client authentication cert_file = "myLocalhost.crt" diff --git a/assets/docs/configuration/targets/kafka-full-example.hcl b/assets/docs/configuration/targets/kafka-full-example.hcl index 41df612d..61f42ead 100644 --- a/assets/docs/configuration/targets/kafka-full-example.hcl +++ b/assets/docs/configuration/targets/kafka-full-example.hcl @@ -38,6 +38,9 @@ target { # The SASL Algorithm to use: "plaintext", "sha512" or "sha256" (default: "sha512") sasl_algorithm = "sha256" + # Whether to enable TLS + enable_tls = true + # The optional certificate file for client authentication cert_file = "myLocalhost.crt" diff --git a/pkg/source/kafka/kafka_source.go b/pkg/source/kafka/kafka_source.go index cd63380e..b975873b 100644 --- a/pkg/source/kafka/kafka_source.go +++ b/pkg/source/kafka/kafka_source.go @@ -42,6 +42,7 @@ type Configuration struct { SASLUsername string `hcl:"sasl_username,optional" ` SASLPassword string `hcl:"sasl_password,optional"` SASLAlgorithm string `hcl:"sasl_algorithm,optional"` + EnableTLS bool `hcl:"enable_tls,optional"` CertFile string `hcl:"cert_file,optional"` KeyFile string `hcl:"key_file,optional"` CaFile string `hcl:"ca_file,optional"` @@ -50,7 +51,6 @@ type Configuration struct { // kafkaSource holds a new client for reading messages from Apache Kafka type kafkaSource struct { - config *sarama.Config concurrentWrites int topic string brokers string @@ -197,6 +197,7 @@ func (f adapter) ProvideDefault() (interface{}, error) { Assignor: "range", SASLAlgorithm: "sha512", ConcurrentWrites: 15, + EnableTLS: false, } return cfg, nil @@ -259,14 +260,14 @@ func newKafkaSource(cfg *Configuration) (*kafkaSource, error) { } } + // returns nil, nil if provided empty certs tlsConfig, err := common.CreateTLSConfiguration(cfg.CertFile, cfg.KeyFile, cfg.CaFile, cfg.SkipVerifyTLS) if err != nil { return nil, err } - if tlsConfig != nil { - saramaConfig.Net.TLS.Config = tlsConfig - saramaConfig.Net.TLS.Enable = true - } + + saramaConfig.Net.TLS.Enable = cfg.EnableTLS + saramaConfig.Net.TLS.Config = tlsConfig client, err := sarama.NewConsumerGroup(strings.Split(cfg.Brokers, ","), fmt.Sprintf(`%s-%s`, cfg.ConsumerName, cfg.TopicName), saramaConfig) if err != nil { diff --git a/pkg/target/http.go b/pkg/target/http.go index 6ef2b798..5ea521f2 100644 --- a/pkg/target/http.go +++ b/pkg/target/http.go @@ -42,11 +42,13 @@ type HTTPTargetConfig struct { Headers string `hcl:"headers,optional"` BasicAuthUsername string `hcl:"basic_auth_username,optional"` BasicAuthPassword string `hcl:"basic_auth_password,optional"` - CertFile string `hcl:"cert_file,optional"` - KeyFile string `hcl:"key_file,optional"` - CaFile string `hcl:"ca_file,optional"` - SkipVerifyTLS bool `hcl:"skip_verify_tls,optional"` // false - DynamicHeaders bool `hcl:"dynamic_headers,optional"` + + EnableTLS bool `hcl:"enable_tls,optional"` + CertFile string `hcl:"cert_file,optional"` + KeyFile string `hcl:"key_file,optional"` + CaFile string `hcl:"ca_file,optional"` + SkipVerifyTLS bool `hcl:"skip_verify_tls,optional"` // false + DynamicHeaders bool `hcl:"dynamic_headers,optional"` OAuth2ClientID string `hcl:"oauth2_client_id,optional"` OAuth2ClientSecret string `hcl:"oauth2_client_secret,optional"` @@ -146,6 +148,7 @@ func newHTTPTarget( headers string, basicAuthUsername string, basicAuthPassword string, + enableTLS bool, certFile string, keyFile string, caFile string, @@ -171,7 +174,8 @@ func newHTTPTarget( if err2 != nil { return nil, err2 } - if tlsConfig != nil { + + if enableTLS && tlsConfig != nil { transport.TLSClientConfig = tlsConfig } @@ -273,6 +277,7 @@ func HTTPTargetConfigFunction(c *HTTPTargetConfig) (*HTTPTarget, error) { c.Headers, c.BasicAuthUsername, c.BasicAuthPassword, + c.EnableTLS, c.CertFile, c.KeyFile, c.CaFile, @@ -304,6 +309,7 @@ func (f HTTPTargetAdapter) ProvideDefault() (interface{}, error) { RequestMaxMessages: 20, RequestByteLimit: 1048576, MessageByteLimit: 1048576, + EnableTLS: false, RequestTimeoutInSeconds: 5, ContentType: "application/json", diff --git a/pkg/target/http_oauth2_test.go b/pkg/target/http_oauth2_test.go index bbc9b08a..a96aa7c1 100644 --- a/pkg/target/http_oauth2_test.go +++ b/pkg/target/http_oauth2_test.go @@ -120,7 +120,7 @@ func runTest(t *testing.T, inputClientID string, inputClientSecret string, input } func oauth2Target(t *testing.T, targetURL string, inputClientID string, inputClientSecret string, inputRefreshToken string, tokenServerURL string) *HTTPTarget { - target, err := newHTTPTarget(targetURL, 5, 1, 1048576, 1048576, "application/json", "", "", "", "", "", "", true, false, inputClientID, inputClientSecret, inputRefreshToken, tokenServerURL, "", defaultResponseRules()) + target, err := newHTTPTarget(targetURL, 5, 1, 1048576, 1048576, "application/json", "", "", "", false, "", "", "", true, false, inputClientID, inputClientSecret, inputRefreshToken, tokenServerURL, "", defaultResponseRules()) if err != nil { t.Fatal(err) } diff --git a/pkg/target/http_test.go b/pkg/target/http_test.go index 49fd82bc..f7341f5c 100644 --- a/pkg/target/http_test.go +++ b/pkg/target/http_test.go @@ -308,12 +308,12 @@ func TestHTTP_AddHeadersToRequest_WithDynamicHeaders(t *testing.T) { func TestHTTP_NewHTTPTarget(t *testing.T) { assert := assert.New(t) - httpTarget, err := newHTTPTarget("http://something", 5, 1, 1048576, 1048576, "application/json", "", "", "", "", "", "", true, false, "", "", "", "", "", defaultResponseRules()) + httpTarget, err := newHTTPTarget("http://something", 5, 1, 1048576, 1048576, "application/json", "", "", "", false, "", "", "", true, false, "", "", "", "", "", defaultResponseRules()) assert.Nil(err) assert.NotNil(httpTarget) - failedHTTPTarget, err1 := newHTTPTarget("something", 5, 1, 1048576, 1048576, "application/json", "", "", "", "", "", "", true, false, "", "", "", "", "", defaultResponseRules()) + failedHTTPTarget, err1 := newHTTPTarget("something", 5, 1, 1048576, 1048576, "application/json", "", "", "", false, "", "", "", true, false, "", "", "", "", "", defaultResponseRules()) assert.NotNil(err1) if err1 != nil { @@ -321,7 +321,7 @@ func TestHTTP_NewHTTPTarget(t *testing.T) { } assert.Nil(failedHTTPTarget) - failedHTTPTarget2, err2 := newHTTPTarget("", 5, 1, 1048576, 1048576, "application/json", "", "", "", "", "", "", true, false, "", "", "", "", "", defaultResponseRules()) + failedHTTPTarget2, err2 := newHTTPTarget("", 5, 1, 1048576, 1048576, "application/json", "", "", "", false, "", "", "", true, false, "", "", "", "", "", defaultResponseRules()) assert.NotNil(err2) if err2 != nil { assert.Equal("Invalid url for HTTP target: ''", err2.Error()) @@ -349,7 +349,7 @@ func TestHTTP_Write_Simple(t *testing.T) { server := createTestServerWithResponseCode(&results, tt.ResponseCode, "") defer server.Close() - target, err := newHTTPTarget(server.URL, 5, 1, 1048576, 1048576, "application/json", "", "", "", "", "", "", true, false, "", "", "", "", "", defaultResponseRules()) + target, err := newHTTPTarget(server.URL, 5, 1, 1048576, 1048576, "application/json", "", "", "", false, "", "", "", true, false, "", "", "", "", "", defaultResponseRules()) if err != nil { t.Fatal(err) } @@ -413,7 +413,7 @@ func TestHTTP_Write_Batched(t *testing.T) { server := createTestServerWithResponseCode(&results, 200, "") defer server.Close() - target, err := newHTTPTarget(server.URL, 5, tt.BatchSize, 1048576, 1048576, "application/json", "", "", "", "", "", "", true, false, "", "", "", "", "", defaultResponseRules()) + target, err := newHTTPTarget(server.URL, 5, tt.BatchSize, 1048576, 1048576, "application/json", "", "", "", false, "", "", "", true, false, "", "", "", "", "", defaultResponseRules()) if err != nil { t.Fatal(err) } @@ -472,7 +472,7 @@ func TestHTTP_Write_Concurrent(t *testing.T) { server := createTestServer(&results) defer server.Close() - target, err := newHTTPTarget(server.URL, 5, 1, 1048576, 1048576, "application/json", "", "", "", "", "", "", true, false, "", "", "", "", "", defaultResponseRules()) + target, err := newHTTPTarget(server.URL, 5, 1, 1048576, 1048576, "application/json", "", "", "", false, "", "", "", true, false, "", "", "", "", "", defaultResponseRules()) if err != nil { t.Fatal(err) } @@ -516,7 +516,7 @@ func TestHTTP_Write_Failure(t *testing.T) { server := createTestServer(&results) defer server.Close() - target, err := newHTTPTarget("http://NonexistentEndpoint", 5, 1, 1048576, 1048576, "application/json", "", "", "", "", "", "", true, false, "", "", "", "", "", defaultResponseRules()) + target, err := newHTTPTarget("http://NonexistentEndpoint", 5, 1, 1048576, 1048576, "application/json", "", "", "", false, "", "", "", true, false, "", "", "", "", "", defaultResponseRules()) if err != nil { t.Fatal(err) } @@ -558,7 +558,7 @@ func TestHTTP_Write_InvalidResponseCode(t *testing.T) { var results [][]byte server := createTestServerWithResponseCode(&results, tt.ResponseCode, "") defer server.Close() - target, err := newHTTPTarget(server.URL, 5, 1, 1048576, 1048576, "application/json", "", "", "", "", "", "", true, false, "", "", "", "", "", defaultResponseRules()) + target, err := newHTTPTarget(server.URL, 5, 1, 1048576, 1048576, "application/json", "", "", "", false, "", "", "", true, false, "", "", "", "", "", defaultResponseRules()) if err != nil { t.Fatal(err) } @@ -593,7 +593,7 @@ func TestHTTP_Write_Oversized(t *testing.T) { server := createTestServer(&results) defer server.Close() - target, err := newHTTPTarget(server.URL, 5, 1, 1048576, 1048576, "application/json", "", "", "", "", "", "", true, false, "", "", "", "", "", defaultResponseRules()) + target, err := newHTTPTarget(server.URL, 5, 1, 1048576, 1048576, "application/json", "", "", "", false, "", "", "", true, false, "", "", "", "", "", defaultResponseRules()) if err != nil { t.Fatal(err) } @@ -636,7 +636,7 @@ func TestHTTP_Write_EnabledTemplating(t *testing.T) { server := createTestServer(&results) defer server.Close() - target, err := newHTTPTarget(server.URL, 5, 5, 1048576, 1048576, "application/json", "", "", "", "", "", "", true, false, "", "", "", "", string(`../../integration/http/template`), defaultResponseRules()) + target, err := newHTTPTarget(server.URL, 5, 5, 1048576, 1048576, "application/json", "", "", "", false, "", "", "", true, false, "", "", "", "", string(`../../integration/http/template`), defaultResponseRules()) if err != nil { t.Fatal(err) } @@ -694,7 +694,7 @@ func TestHTTP_Write_Invalid(t *testing.T) { }, } - target, err := newHTTPTarget(server.URL, 5, 5, 1048576, 1048576, "application/json", "", "", "", "", "", "", true, false, "", "", "", "", string(`../../integration/http/template`), &responseRules) + target, err := newHTTPTarget(server.URL, 5, 5, 1048576, 1048576, "application/json", "", "", "", false, "", "", "", true, false, "", "", "", "", string(`../../integration/http/template`), &responseRules) if err != nil { t.Fatal(err) } @@ -722,7 +722,7 @@ func TestHTTP_Write_Setup(t *testing.T) { }, } - target, err := newHTTPTarget(server.URL, 5, 5, 1048576, 1048576, "application/json", "", "", "", "", "", "", true, false, "", "", "", "", string(`../../integration/http/template`), &responseRules) + target, err := newHTTPTarget(server.URL, 5, 5, 1048576, 1048576, "application/json", "", "", "", false, "", "", "", true, false, "", "", "", "", string(`../../integration/http/template`), &responseRules) if err != nil { t.Fatal(err) } @@ -757,6 +757,7 @@ func TestHTTP_Write_TLS(t *testing.T) { "", "", "", + true, string(`../../integration/http/localhost.crt`), string(`../../integration/http/localhost.key`), string(`../../integration/http/rootCA.crt`), @@ -798,6 +799,7 @@ func TestHTTP_Write_TLS(t *testing.T) { "", "", "", + true, string(`../../integration/http/localhost.crt`), string(`../../integration/http/localhost.key`), string(`../../integration/http/rootCA.crt`), @@ -832,6 +834,7 @@ func TestHTTP_Write_TLS(t *testing.T) { "", "", "", + false, "", "", "", @@ -997,7 +1000,7 @@ func TestHTTP_Write_GroupedRequests(t *testing.T) { defer server.Close() //dynamicHeaders enabled - target, err := newHTTPTarget(server.URL, 5, 5, 1048576, 1048576, "application/json", "", "", "", "", "", "", true, true, "", "", "", "", "", defaultResponseRules()) + target, err := newHTTPTarget(server.URL, 5, 5, 1048576, 1048576, "application/json", "", "", "", false, "", "", "", true, true, "", "", "", "", "", defaultResponseRules()) if err != nil { t.Fatal(err) } diff --git a/pkg/target/kafka.go b/pkg/target/kafka.go index 720e9b64..a936b4ed 100644 --- a/pkg/target/kafka.go +++ b/pkg/target/kafka.go @@ -38,6 +38,7 @@ type KafkaConfig struct { SASLUsername string `hcl:"sasl_username,optional"` SASLPassword string `hcl:"sasl_password,optional"` SASLAlgorithm string `hcl:"sasl_algorithm,optional"` + EnableTLS bool `hcl:"enable_tls,optional"` CertFile string `hcl:"cert_file,optional"` KeyFile string `hcl:"key_file,optional"` CaFile string `hcl:"ca_file,optional"` @@ -111,14 +112,13 @@ func NewKafkaTarget(cfg *KafkaConfig) (*KafkaTarget, error) { } } + // returns nil if certs are empty tlsConfig, err := common.CreateTLSConfiguration(cfg.CertFile, cfg.KeyFile, cfg.CaFile, cfg.SkipVerifyTLS) if err != nil { return nil, err } - if tlsConfig != nil { - saramaConfig.Net.TLS.Config = tlsConfig - saramaConfig.Net.TLS.Enable = true - } + saramaConfig.Net.TLS.Enable = cfg.EnableTLS + saramaConfig.Net.TLS.Config = tlsConfig var asyncResults chan *saramaResult = nil var asyncProducer sarama.AsyncProducer = nil @@ -187,6 +187,7 @@ func (f KafkaTargetAdapter) ProvideDefault() (interface{}, error) { MaxRetries: 10, ByteLimit: 1048576, SASLAlgorithm: "sha512", + EnableTLS: false, } return cfg, nil From 7c243678708a29b70500b5721b5e953c786a20b3 Mon Sep 17 00:00:00 2001 From: colmsnowplow Date: Tue, 17 Sep 2024 18:02:52 +0100 Subject: [PATCH 2/2] Fix config test --- config/component_test.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/config/component_test.go b/config/component_test.go index 964a3771..d95bdb03 100644 --- a/config/component_test.go +++ b/config/component_test.go @@ -83,6 +83,7 @@ func TestCreateTargetComponentHCL(t *testing.T) { Headers: "", BasicAuthUsername: "", BasicAuthPassword: "", + EnableTLS: false, CertFile: "", KeyFile: "", CaFile: "", @@ -110,6 +111,7 @@ func TestCreateTargetComponentHCL(t *testing.T) { OAuth2ClientSecret: "myClientSecret", OAuth2RefreshToken: "myRefreshToken", OAuth2TokenURL: "https://my.auth.server/token", + EnableTLS: true, CertFile: "myLocalhost.crt", KeyFile: "myLocalhost.key", CaFile: "myRootCA.crt", @@ -151,6 +153,7 @@ func TestCreateTargetComponentHCL(t *testing.T) { SASLUsername: "", SASLPassword: "", SASLAlgorithm: "sha512", + EnableTLS: false, CertFile: "", KeyFile: "", CaFile: "", @@ -177,6 +180,7 @@ func TestCreateTargetComponentHCL(t *testing.T) { SASLUsername: "mySaslUsername", SASLPassword: "mySASLPassword", SASLAlgorithm: "sha256", + EnableTLS: true, CertFile: "myLocalhost.crt", KeyFile: "myLocalhost.key", CaFile: "myRootCA.crt",