Skip to content

Commit

Permalink
Add boolean config to enable TLS
Browse files Browse the repository at this point in the history
  • Loading branch information
colmsnowplow committed Sep 17, 2024
1 parent 45c34bb commit babacd9
Show file tree
Hide file tree
Showing 8 changed files with 49 additions and 29 deletions.
3 changes: 3 additions & 0 deletions assets/docs/configuration/sources/kafka-full-example.hcl
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ source {
# The SASL Algorithm to use: "plaintext", "sha512" or "sha256" (default: "sha512")
sasl_algorithm = "sha256"

# Whether to enable TLS
enable_tls = true

# The optional certificate file for client authentication
cert_file = "myLocalhost.crt"

Expand Down
3 changes: 3 additions & 0 deletions assets/docs/configuration/targets/http-full-example.hcl
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ target {
# you could also reference an environment variable.
basic_auth_password = env.MY_AUTH_PASSWORD

# Whether to enable TLS
enable_tls = true

# The optional certificate file for client authentication
cert_file = "myLocalhost.crt"

Expand Down
3 changes: 3 additions & 0 deletions assets/docs/configuration/targets/kafka-full-example.hcl
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ target {
# The SASL Algorithm to use: "plaintext", "sha512" or "sha256" (default: "sha512")
sasl_algorithm = "sha256"

# Whether to enable TLS
enable_tls = true

# The optional certificate file for client authentication
cert_file = "myLocalhost.crt"

Expand Down
11 changes: 6 additions & 5 deletions pkg/source/kafka/kafka_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ type Configuration struct {
SASLUsername string `hcl:"sasl_username,optional" `
SASLPassword string `hcl:"sasl_password,optional"`
SASLAlgorithm string `hcl:"sasl_algorithm,optional"`
EnableTLS bool `hcl:"enable_tls,optional"`
CertFile string `hcl:"cert_file,optional"`
KeyFile string `hcl:"key_file,optional"`
CaFile string `hcl:"ca_file,optional"`
Expand All @@ -50,7 +51,6 @@ type Configuration struct {

// kafkaSource holds a new client for reading messages from Apache Kafka
type kafkaSource struct {
config *sarama.Config
concurrentWrites int
topic string
brokers string
Expand Down Expand Up @@ -197,6 +197,7 @@ func (f adapter) ProvideDefault() (interface{}, error) {
Assignor: "range",
SASLAlgorithm: "sha512",
ConcurrentWrites: 15,
EnableTLS: false,
}

return cfg, nil
Expand Down Expand Up @@ -259,14 +260,14 @@ func newKafkaSource(cfg *Configuration) (*kafkaSource, error) {
}
}

// returns nil, nil if provided empty certs
tlsConfig, err := common.CreateTLSConfiguration(cfg.CertFile, cfg.KeyFile, cfg.CaFile, cfg.SkipVerifyTLS)
if err != nil {
return nil, err
}
if tlsConfig != nil {
saramaConfig.Net.TLS.Config = tlsConfig
saramaConfig.Net.TLS.Enable = true
}

saramaConfig.Net.TLS.Enable = cfg.EnableTLS
saramaConfig.Net.TLS.Config = tlsConfig

client, err := sarama.NewConsumerGroup(strings.Split(cfg.Brokers, ","), fmt.Sprintf(`%s-%s`, cfg.ConsumerName, cfg.TopicName), saramaConfig)
if err != nil {
Expand Down
18 changes: 12 additions & 6 deletions pkg/target/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,13 @@ type HTTPTargetConfig struct {
Headers string `hcl:"headers,optional"`
BasicAuthUsername string `hcl:"basic_auth_username,optional"`
BasicAuthPassword string `hcl:"basic_auth_password,optional"`
CertFile string `hcl:"cert_file,optional"`
KeyFile string `hcl:"key_file,optional"`
CaFile string `hcl:"ca_file,optional"`
SkipVerifyTLS bool `hcl:"skip_verify_tls,optional"` // false
DynamicHeaders bool `hcl:"dynamic_headers,optional"`

EnableTLS bool `hcl:"enable_tls,optional"`
CertFile string `hcl:"cert_file,optional"`
KeyFile string `hcl:"key_file,optional"`
CaFile string `hcl:"ca_file,optional"`
SkipVerifyTLS bool `hcl:"skip_verify_tls,optional"` // false
DynamicHeaders bool `hcl:"dynamic_headers,optional"`

OAuth2ClientID string `hcl:"oauth2_client_id,optional"`
OAuth2ClientSecret string `hcl:"oauth2_client_secret,optional"`
Expand Down Expand Up @@ -146,6 +148,7 @@ func newHTTPTarget(
headers string,
basicAuthUsername string,
basicAuthPassword string,
enableTLS bool,
certFile string,
keyFile string,
caFile string,
Expand All @@ -171,7 +174,8 @@ func newHTTPTarget(
if err2 != nil {
return nil, err2
}
if tlsConfig != nil {

if enableTLS && tlsConfig != nil {
transport.TLSClientConfig = tlsConfig
}

Expand Down Expand Up @@ -273,6 +277,7 @@ func HTTPTargetConfigFunction(c *HTTPTargetConfig) (*HTTPTarget, error) {
c.Headers,
c.BasicAuthUsername,
c.BasicAuthPassword,
c.EnableTLS,
c.CertFile,
c.KeyFile,
c.CaFile,
Expand Down Expand Up @@ -304,6 +309,7 @@ func (f HTTPTargetAdapter) ProvideDefault() (interface{}, error) {
RequestMaxMessages: 20,
RequestByteLimit: 1048576,
MessageByteLimit: 1048576,
EnableTLS: false,

RequestTimeoutInSeconds: 5,
ContentType: "application/json",
Expand Down
2 changes: 1 addition & 1 deletion pkg/target/http_oauth2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func runTest(t *testing.T, inputClientID string, inputClientSecret string, input
}

func oauth2Target(t *testing.T, targetURL string, inputClientID string, inputClientSecret string, inputRefreshToken string, tokenServerURL string) *HTTPTarget {
target, err := newHTTPTarget(targetURL, 5, 1, 1048576, 1048576, "application/json", "", "", "", "", "", "", true, false, inputClientID, inputClientSecret, inputRefreshToken, tokenServerURL, "", defaultResponseRules())
target, err := newHTTPTarget(targetURL, 5, 1, 1048576, 1048576, "application/json", "", "", "", false, "", "", "", true, false, inputClientID, inputClientSecret, inputRefreshToken, tokenServerURL, "", defaultResponseRules())
if err != nil {
t.Fatal(err)
}
Expand Down
29 changes: 16 additions & 13 deletions pkg/target/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,20 +308,20 @@ func TestHTTP_AddHeadersToRequest_WithDynamicHeaders(t *testing.T) {
func TestHTTP_NewHTTPTarget(t *testing.T) {
assert := assert.New(t)

httpTarget, err := newHTTPTarget("http://something", 5, 1, 1048576, 1048576, "application/json", "", "", "", "", "", "", true, false, "", "", "", "", "", defaultResponseRules())
httpTarget, err := newHTTPTarget("http://something", 5, 1, 1048576, 1048576, "application/json", "", "", "", false, "", "", "", true, false, "", "", "", "", "", defaultResponseRules())

assert.Nil(err)
assert.NotNil(httpTarget)

failedHTTPTarget, err1 := newHTTPTarget("something", 5, 1, 1048576, 1048576, "application/json", "", "", "", "", "", "", true, false, "", "", "", "", "", defaultResponseRules())
failedHTTPTarget, err1 := newHTTPTarget("something", 5, 1, 1048576, 1048576, "application/json", "", "", "", false, "", "", "", true, false, "", "", "", "", "", defaultResponseRules())

assert.NotNil(err1)
if err1 != nil {
assert.Equal("Invalid url for HTTP target: 'something'", err1.Error())
}
assert.Nil(failedHTTPTarget)

failedHTTPTarget2, err2 := newHTTPTarget("", 5, 1, 1048576, 1048576, "application/json", "", "", "", "", "", "", true, false, "", "", "", "", "", defaultResponseRules())
failedHTTPTarget2, err2 := newHTTPTarget("", 5, 1, 1048576, 1048576, "application/json", "", "", "", false, "", "", "", true, false, "", "", "", "", "", defaultResponseRules())
assert.NotNil(err2)
if err2 != nil {
assert.Equal("Invalid url for HTTP target: ''", err2.Error())
Expand Down Expand Up @@ -349,7 +349,7 @@ func TestHTTP_Write_Simple(t *testing.T) {
server := createTestServerWithResponseCode(&results, tt.ResponseCode, "")
defer server.Close()

target, err := newHTTPTarget(server.URL, 5, 1, 1048576, 1048576, "application/json", "", "", "", "", "", "", true, false, "", "", "", "", "", defaultResponseRules())
target, err := newHTTPTarget(server.URL, 5, 1, 1048576, 1048576, "application/json", "", "", "", false, "", "", "", true, false, "", "", "", "", "", defaultResponseRules())
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -413,7 +413,7 @@ func TestHTTP_Write_Batched(t *testing.T) {
server := createTestServerWithResponseCode(&results, 200, "")
defer server.Close()

target, err := newHTTPTarget(server.URL, 5, tt.BatchSize, 1048576, 1048576, "application/json", "", "", "", "", "", "", true, false, "", "", "", "", "", defaultResponseRules())
target, err := newHTTPTarget(server.URL, 5, tt.BatchSize, 1048576, 1048576, "application/json", "", "", "", false, "", "", "", true, false, "", "", "", "", "", defaultResponseRules())
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -472,7 +472,7 @@ func TestHTTP_Write_Concurrent(t *testing.T) {
server := createTestServer(&results)
defer server.Close()

target, err := newHTTPTarget(server.URL, 5, 1, 1048576, 1048576, "application/json", "", "", "", "", "", "", true, false, "", "", "", "", "", defaultResponseRules())
target, err := newHTTPTarget(server.URL, 5, 1, 1048576, 1048576, "application/json", "", "", "", false, "", "", "", true, false, "", "", "", "", "", defaultResponseRules())
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -516,7 +516,7 @@ func TestHTTP_Write_Failure(t *testing.T) {
server := createTestServer(&results)
defer server.Close()

target, err := newHTTPTarget("http://NonexistentEndpoint", 5, 1, 1048576, 1048576, "application/json", "", "", "", "", "", "", true, false, "", "", "", "", "", defaultResponseRules())
target, err := newHTTPTarget("http://NonexistentEndpoint", 5, 1, 1048576, 1048576, "application/json", "", "", "", false, "", "", "", true, false, "", "", "", "", "", defaultResponseRules())
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -558,7 +558,7 @@ func TestHTTP_Write_InvalidResponseCode(t *testing.T) {
var results [][]byte
server := createTestServerWithResponseCode(&results, tt.ResponseCode, "")
defer server.Close()
target, err := newHTTPTarget(server.URL, 5, 1, 1048576, 1048576, "application/json", "", "", "", "", "", "", true, false, "", "", "", "", "", defaultResponseRules())
target, err := newHTTPTarget(server.URL, 5, 1, 1048576, 1048576, "application/json", "", "", "", false, "", "", "", true, false, "", "", "", "", "", defaultResponseRules())
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -593,7 +593,7 @@ func TestHTTP_Write_Oversized(t *testing.T) {
server := createTestServer(&results)
defer server.Close()

target, err := newHTTPTarget(server.URL, 5, 1, 1048576, 1048576, "application/json", "", "", "", "", "", "", true, false, "", "", "", "", "", defaultResponseRules())
target, err := newHTTPTarget(server.URL, 5, 1, 1048576, 1048576, "application/json", "", "", "", false, "", "", "", true, false, "", "", "", "", "", defaultResponseRules())
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -636,7 +636,7 @@ func TestHTTP_Write_EnabledTemplating(t *testing.T) {
server := createTestServer(&results)
defer server.Close()

target, err := newHTTPTarget(server.URL, 5, 5, 1048576, 1048576, "application/json", "", "", "", "", "", "", true, false, "", "", "", "", string(`../../integration/http/template`), defaultResponseRules())
target, err := newHTTPTarget(server.URL, 5, 5, 1048576, 1048576, "application/json", "", "", "", false, "", "", "", true, false, "", "", "", "", string(`../../integration/http/template`), defaultResponseRules())
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -694,7 +694,7 @@ func TestHTTP_Write_Invalid(t *testing.T) {
},
}

target, err := newHTTPTarget(server.URL, 5, 5, 1048576, 1048576, "application/json", "", "", "", "", "", "", true, false, "", "", "", "", string(`../../integration/http/template`), &responseRules)
target, err := newHTTPTarget(server.URL, 5, 5, 1048576, 1048576, "application/json", "", "", "", false, "", "", "", true, false, "", "", "", "", string(`../../integration/http/template`), &responseRules)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -722,7 +722,7 @@ func TestHTTP_Write_Setup(t *testing.T) {
},
}

target, err := newHTTPTarget(server.URL, 5, 5, 1048576, 1048576, "application/json", "", "", "", "", "", "", true, false, "", "", "", "", string(`../../integration/http/template`), &responseRules)
target, err := newHTTPTarget(server.URL, 5, 5, 1048576, 1048576, "application/json", "", "", "", false, "", "", "", true, false, "", "", "", "", string(`../../integration/http/template`), &responseRules)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -757,6 +757,7 @@ func TestHTTP_Write_TLS(t *testing.T) {
"",
"",
"",
true,
string(`../../integration/http/localhost.crt`),
string(`../../integration/http/localhost.key`),
string(`../../integration/http/rootCA.crt`),
Expand Down Expand Up @@ -798,6 +799,7 @@ func TestHTTP_Write_TLS(t *testing.T) {
"",
"",
"",
true,
string(`../../integration/http/localhost.crt`),
string(`../../integration/http/localhost.key`),
string(`../../integration/http/rootCA.crt`),
Expand Down Expand Up @@ -832,6 +834,7 @@ func TestHTTP_Write_TLS(t *testing.T) {
"",
"",
"",
false,
"",
"",
"",
Expand Down Expand Up @@ -997,7 +1000,7 @@ func TestHTTP_Write_GroupedRequests(t *testing.T) {
defer server.Close()

//dynamicHeaders enabled
target, err := newHTTPTarget(server.URL, 5, 5, 1048576, 1048576, "application/json", "", "", "", "", "", "", true, true, "", "", "", "", "", defaultResponseRules())
target, err := newHTTPTarget(server.URL, 5, 5, 1048576, 1048576, "application/json", "", "", "", false, "", "", "", true, true, "", "", "", "", "", defaultResponseRules())
if err != nil {
t.Fatal(err)
}
Expand Down
9 changes: 5 additions & 4 deletions pkg/target/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ type KafkaConfig struct {
SASLUsername string `hcl:"sasl_username,optional"`
SASLPassword string `hcl:"sasl_password,optional"`
SASLAlgorithm string `hcl:"sasl_algorithm,optional"`
EnableTLS bool `hcl:"enable_tls,optional"`
CertFile string `hcl:"cert_file,optional"`
KeyFile string `hcl:"key_file,optional"`
CaFile string `hcl:"ca_file,optional"`
Expand Down Expand Up @@ -111,14 +112,13 @@ func NewKafkaTarget(cfg *KafkaConfig) (*KafkaTarget, error) {
}
}

// returns nil if certs are empty
tlsConfig, err := common.CreateTLSConfiguration(cfg.CertFile, cfg.KeyFile, cfg.CaFile, cfg.SkipVerifyTLS)
if err != nil {
return nil, err
}
if tlsConfig != nil {
saramaConfig.Net.TLS.Config = tlsConfig
saramaConfig.Net.TLS.Enable = true
}
saramaConfig.Net.TLS.Enable = cfg.EnableTLS
saramaConfig.Net.TLS.Config = tlsConfig

var asyncResults chan *saramaResult = nil
var asyncProducer sarama.AsyncProducer = nil
Expand Down Expand Up @@ -187,6 +187,7 @@ func (f KafkaTargetAdapter) ProvideDefault() (interface{}, error) {
MaxRetries: 10,
ByteLimit: 1048576,
SASLAlgorithm: "sha512",
EnableTLS: false,
}

return cfg, nil
Expand Down

0 comments on commit babacd9

Please sign in to comment.