diff --git a/data-prepper-plugins/opensearch-source/README.md b/data-prepper-plugins/opensearch-source/README.md index 4894ba646d..b904d67378 100644 --- a/data-prepper-plugins/opensearch-source/README.md +++ b/data-prepper-plugins/opensearch-source/README.md @@ -114,6 +114,9 @@ opensearch-source-pipeline: - `password` (Optional) : A String of password used in the internal users of OpenSearch cluster. Default is null. +- `disable_authentication` (Optional) : A boolean that can disable authentication if the cluster supports it. Defaults to false. + + - `aws` (Optional) : AWS configurations. See [AWS Configuration](#aws_configuration) for details. SigV4 is enabled by default when this option is used. diff --git a/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/OpenSearchSource.java b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/OpenSearchSource.java index 9a91db3ac1..22455dec3f 100644 --- a/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/OpenSearchSource.java +++ b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/OpenSearchSource.java @@ -31,6 +31,8 @@ public OpenSearchSource(final OpenSearchSourceConfiguration openSearchSourceConf final AwsCredentialsSupplier awsCredentialsSupplier) { this.openSearchSourceConfiguration = openSearchSourceConfiguration; this.awsCredentialsSupplier = awsCredentialsSupplier; + + openSearchSourceConfiguration.validateAwsConfigWithUsernameAndPassword(); } @Override diff --git a/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/OpenSearchSourceConfiguration.java b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/OpenSearchSourceConfiguration.java index 8e2fd42384..a100e17b20 100644 --- a/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/OpenSearchSourceConfiguration.java +++ b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/OpenSearchSourceConfiguration.java @@ -6,9 +6,8 @@ import com.fasterxml.jackson.annotation.JsonProperty; import jakarta.validation.Valid; -import jakarta.validation.constraints.AssertTrue; -import jakarta.validation.constraints.Min; import jakarta.validation.constraints.NotNull; +import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException; import org.opensearch.dataprepper.plugins.source.opensearch.configuration.AwsAuthenticationConfiguration; import org.opensearch.dataprepper.plugins.source.opensearch.configuration.ConnectionConfiguration; import org.opensearch.dataprepper.plugins.source.opensearch.configuration.IndexParametersConfiguration; @@ -20,13 +19,6 @@ public class OpenSearchSourceConfiguration { - /** - * 0 indicates infinite retries - */ - @JsonProperty("max_retries") - @Min(0) - private Integer maxRetries = 0; - @NotNull @JsonProperty("hosts") private List hosts; @@ -37,6 +29,9 @@ public class OpenSearchSourceConfiguration { @JsonProperty("password") private String password; + @JsonProperty("disable_authentication") + private Boolean disableAuthentication = false; + @JsonProperty("connection") @Valid private ConnectionConfiguration connectionConfiguration = new ConnectionConfiguration(); @@ -57,10 +52,6 @@ public class OpenSearchSourceConfiguration { @Valid private SearchConfiguration searchConfiguration = new SearchConfiguration(); - public Integer getMaxRetries() { - return maxRetries; - } - public List getHosts() { return hosts; } @@ -73,6 +64,8 @@ public String getPassword() { return password; } + public Boolean isAuthenticationDisabled() { return disableAuthentication; } + public ConnectionConfiguration getConnectionConfiguration() { return connectionConfiguration; } @@ -93,10 +86,13 @@ public SearchConfiguration getSearchConfiguration() { return searchConfiguration; } - @AssertTrue(message = "Either username and password, or aws options must be specified. Both cannot be set at once.") - boolean validateAwsConfigWithUsernameAndPassword() { - return !((Objects.nonNull(awsAuthenticationOptions) && (Objects.nonNull(username) || Objects.nonNull(password))) || - (Objects.isNull(awsAuthenticationOptions) && (Objects.isNull(username) || Objects.isNull(password)))); + void validateAwsConfigWithUsernameAndPassword() { + + if (((Objects.nonNull(awsAuthenticationOptions) && ((Objects.nonNull(username) || Objects.nonNull(password)) || disableAuthentication)) || + (Objects.nonNull(username) || Objects.nonNull(password)) && disableAuthentication) || + (Objects.isNull(awsAuthenticationOptions) && (Objects.isNull(username) || Objects.isNull(password)) && !disableAuthentication)) { + throw new InvalidPluginConfigurationException("Either username and password, or aws options must be specified. Both cannot be set at once. Authentication can be disabled by setting the disable_authentication flag to true."); + } } } diff --git a/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/OpenSearchClientFactory.java b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/OpenSearchClientFactory.java index d9e3a2f739..e588e1f711 100644 --- a/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/OpenSearchClientFactory.java +++ b/data-prepper-plugins/opensearch-source/src/main/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/OpenSearchClientFactory.java @@ -133,8 +133,7 @@ private RestClient createOpenSearchRestClient(final OpenSearchSourceConfiguratio final RestClientBuilder restClientBuilder = RestClient.builder(httpHosts); - LOG.info("Using username and password for auth for the OpenSearch source"); - attachUsernamePassword(restClientBuilder, openSearchSourceConfiguration); + attachBasicAuth(restClientBuilder, openSearchSourceConfiguration); setConnectAndSocketTimeout(restClientBuilder, openSearchSourceConfiguration); @@ -161,33 +160,36 @@ private org.elasticsearch.client.RestClient createElasticSearchRestClient(final new BasicHeader("Content-type", "application/json") }); - LOG.info("Using username and password for auth for the OpenSearch source"); - attachUsernamePassword(restClientBuilder, openSearchSourceConfiguration); - + attachBasicAuth(restClientBuilder, openSearchSourceConfiguration); setConnectAndSocketTimeout(restClientBuilder, openSearchSourceConfiguration); return restClientBuilder.build(); } - private void attachUsernamePassword(final RestClientBuilder restClientBuilder, final OpenSearchSourceConfiguration openSearchSourceConfiguration) { - final CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); - credentialsProvider.setCredentials(AuthScope.ANY, - new UsernamePasswordCredentials(openSearchSourceConfiguration.getUsername(), openSearchSourceConfiguration.getPassword())); + private void attachBasicAuth(final RestClientBuilder restClientBuilder, final OpenSearchSourceConfiguration openSearchSourceConfiguration) { restClientBuilder.setHttpClientConfigCallback(httpClientBuilder -> { - httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider); + if (!openSearchSourceConfiguration.isAuthenticationDisabled()) { + attachUsernameAndPassword(httpClientBuilder, openSearchSourceConfiguration); + } else { + LOG.warn("Authentication was explicitly disabled for the OpenSearch source"); + } + attachSSLContext(httpClientBuilder, openSearchSourceConfiguration); return httpClientBuilder; }); } - private void attachUsernamePassword(final org.elasticsearch.client.RestClientBuilder restClientBuilder, final OpenSearchSourceConfiguration openSearchSourceConfiguration) { - final CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); - credentialsProvider.setCredentials(AuthScope.ANY, - new UsernamePasswordCredentials(openSearchSourceConfiguration.getUsername(), openSearchSourceConfiguration.getPassword())); + private void attachBasicAuth(final org.elasticsearch.client.RestClientBuilder restClientBuilder, final OpenSearchSourceConfiguration openSearchSourceConfiguration) { restClientBuilder.setHttpClientConfigCallback(httpClientBuilder -> { - httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider); + + if (!openSearchSourceConfiguration.isAuthenticationDisabled()) { + attachUsernameAndPassword(httpClientBuilder, openSearchSourceConfiguration); + } else { + LOG.warn("Authentication was explicitly disabled for the OpenSearch source"); + } + attachSSLContext(httpClientBuilder, openSearchSourceConfiguration); httpClientBuilder.addInterceptorLast( (HttpResponseInterceptor) @@ -211,6 +213,15 @@ private void setConnectAndSocketTimeout(final RestClientBuilder restClientBuilde }); } + private void attachUsernameAndPassword(final HttpAsyncClientBuilder httpClientBuilder, final OpenSearchSourceConfiguration openSearchSourceConfiguration) { + LOG.info("Using username and password for auth for the OpenSearch source"); + + final CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); + credentialsProvider.setCredentials(AuthScope.ANY, + new UsernamePasswordCredentials(openSearchSourceConfiguration.getUsername(), openSearchSourceConfiguration.getPassword())); + httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider); + } + private void setConnectAndSocketTimeout(final org.elasticsearch.client.RestClientBuilder restClientBuilder, final OpenSearchSourceConfiguration openSearchSourceConfiguration) { restClientBuilder.setRequestConfigCallback(requestConfigBuilder -> { if (Objects.nonNull(openSearchSourceConfiguration.getConnectionConfiguration().getConnectTimeout())) { diff --git a/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/OpenSearchSourceConfigurationTest.java b/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/OpenSearchSourceConfigurationTest.java index 7c9f8dbd19..950533c145 100644 --- a/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/OpenSearchSourceConfigurationTest.java +++ b/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/OpenSearchSourceConfigurationTest.java @@ -9,10 +9,12 @@ import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; import com.fasterxml.jackson.dataformat.yaml.YAMLGenerator; import org.junit.jupiter.api.Test; +import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.notNullValue; +import static org.junit.jupiter.api.Assertions.assertThrows; public class OpenSearchSourceConfigurationTest { @@ -21,7 +23,7 @@ public class OpenSearchSourceConfigurationTest { @Test void open_search_source_username_password_only() throws JsonProcessingException { - final String sourceConfigurationYaml = "max_retries: 5\n" + + final String sourceConfigurationYaml = "hosts: [\"http://localhost:9200\"]\n" + "username: test\n" + "password: test\n" + @@ -44,18 +46,49 @@ void open_search_source_username_password_only() throws JsonProcessingException assertThat(sourceConfiguration.getIndexParametersConfiguration(), notNullValue()); assertThat(sourceConfiguration.getSchedulingParameterConfiguration(), notNullValue()); assertThat(sourceConfiguration.getHosts(), notNullValue()); - assertThat(sourceConfiguration.getMaxRetries(), equalTo(5)); - assertThat(sourceConfiguration.validateAwsConfigWithUsernameAndPassword(), equalTo(true)); + sourceConfiguration.validateAwsConfigWithUsernameAndPassword(); assertThat(sourceConfiguration.getPassword(), equalTo("test")); assertThat(sourceConfiguration.getUsername(), equalTo("test")); assertThat(sourceConfiguration.getAwsAuthenticationOptions(), equalTo(null)); } @Test - void opensearch_source_aws_only() throws JsonProcessingException { - final String sourceConfigurationYaml = "max_retries: 5\n" + + void open_search_disabled_authentication() throws JsonProcessingException { + + final String sourceConfigurationYaml = "hosts: [\"http://localhost:9200\"]\n" + + "disable_authentication: true\n" + + "connection:\n" + + " insecure: true\n" + + " cert: \"cert\"\n" + + "indices:\n" + + " include:\n" + + " - index_name_regex: \"regex\"\n" + + " - index_name_regex: \"regex-two\"\n" + + "scheduling:\n" + + " job_count: 3\n" + + "search_options:\n" + + " batch_size: 1000\n" + + " query: \"test\"\n"; + final OpenSearchSourceConfiguration sourceConfiguration = objectMapper.readValue(sourceConfigurationYaml, OpenSearchSourceConfiguration.class); + + assertThat(sourceConfiguration.getSearchConfiguration(), notNullValue()); + assertThat(sourceConfiguration.getConnectionConfiguration(), notNullValue()); + assertThat(sourceConfiguration.getIndexParametersConfiguration(), notNullValue()); + assertThat(sourceConfiguration.getSchedulingParameterConfiguration(), notNullValue()); + assertThat(sourceConfiguration.getHosts(), notNullValue()); + + sourceConfiguration.validateAwsConfigWithUsernameAndPassword(); + assertThat(sourceConfiguration.isAuthenticationDisabled(), equalTo(true)); + assertThat(sourceConfiguration.getPassword(), equalTo(null)); + assertThat(sourceConfiguration.getUsername(), equalTo(null)); + assertThat(sourceConfiguration.getAwsAuthenticationOptions(), equalTo(null)); + } + + @Test + void opensearch_source_aws_only() throws JsonProcessingException { + final String sourceConfigurationYaml = "hosts: [\"http://localhost:9200\"]\n" + "connection:\n" + " insecure: true\n" + " cert: \"cert\"\n" + @@ -74,7 +107,7 @@ void opensearch_source_aws_only() throws JsonProcessingException { final OpenSearchSourceConfiguration sourceConfiguration = objectMapper.readValue(sourceConfigurationYaml, OpenSearchSourceConfiguration.class); - assertThat(sourceConfiguration.validateAwsConfigWithUsernameAndPassword(), equalTo(true)); + sourceConfiguration.validateAwsConfigWithUsernameAndPassword(); assertThat(sourceConfiguration.getPassword(), equalTo(null)); assertThat(sourceConfiguration.getUsername(), equalTo(null)); assertThat(sourceConfiguration.getAwsAuthenticationOptions(), notNullValue()); @@ -85,8 +118,7 @@ void opensearch_source_aws_only() throws JsonProcessingException { @Test void opensearch_source_aws_sts_external_id() throws JsonProcessingException { - final String sourceConfigurationYaml = "max_retries: 5\n" + - "hosts: [\"http://localhost:9200\"]\n" + + final String sourceConfigurationYaml = "hosts: [\"http://localhost:9200\"]\n" + "connection:\n" + " insecure: true\n" + " cert: \"cert\"\n" + @@ -106,7 +138,7 @@ void opensearch_source_aws_sts_external_id() throws JsonProcessingException { final OpenSearchSourceConfiguration sourceConfiguration = objectMapper.readValue(sourceConfigurationYaml, OpenSearchSourceConfiguration.class); - assertThat(sourceConfiguration.validateAwsConfigWithUsernameAndPassword(), equalTo(true)); + sourceConfiguration.validateAwsConfigWithUsernameAndPassword(); assertThat(sourceConfiguration.getPassword(), equalTo(null)); assertThat(sourceConfiguration.getUsername(), equalTo(null)); assertThat(sourceConfiguration.getAwsAuthenticationOptions(), notNullValue()); @@ -141,14 +173,12 @@ void using_both_aws_config_and_username_password_is_invalid() throws JsonProcess final OpenSearchSourceConfiguration sourceConfiguration = objectMapper.readValue(sourceConfigurationYaml, OpenSearchSourceConfiguration.class); - assertThat(sourceConfiguration.validateAwsConfigWithUsernameAndPassword(), equalTo(false)); - assertThat(sourceConfiguration.getMaxRetries(), equalTo(0)); + assertThrows(InvalidPluginConfigurationException.class, sourceConfiguration::validateAwsConfigWithUsernameAndPassword); } @Test - void one_of_username_password_or_aws_config_is_required() throws JsonProcessingException { + void one_of_username_password_or_aws_config_or_authDisabled_is_required() throws JsonProcessingException { final String sourceConfigurationYaml = - "max_retries: 5\n" + "hosts: [\"http://localhost:9200\"]\n" + "connection:\n" + " insecure: true\n" + @@ -165,6 +195,6 @@ void one_of_username_password_or_aws_config_is_required() throws JsonProcessingE final OpenSearchSourceConfiguration sourceConfiguration = objectMapper.readValue(sourceConfigurationYaml, OpenSearchSourceConfiguration.class); - assertThat(sourceConfiguration.validateAwsConfigWithUsernameAndPassword(), equalTo(false)); + assertThrows(InvalidPluginConfigurationException.class, sourceConfiguration::validateAwsConfigWithUsernameAndPassword); } } diff --git a/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/OpenSearchClientFactoryTest.java b/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/OpenSearchClientFactoryTest.java index cc811625d1..1cd2ad551c 100644 --- a/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/OpenSearchClientFactoryTest.java +++ b/data-prepper-plugins/opensearch-source/src/test/java/org/opensearch/dataprepper/plugins/source/opensearch/worker/client/OpenSearchClientFactoryTest.java @@ -29,6 +29,8 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.notNullValue; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoInteractions; import static org.mockito.Mockito.when; @@ -119,4 +121,38 @@ void provideOpenSearchClient_with_aws_auth() { assertThat(awsCredentialsOptions.getStsHeaderOverrides(), equalTo(Collections.emptyMap())); assertThat(awsCredentialsOptions.getStsRoleArn(), equalTo(stsRoleArn)); } + + @Test + void provideElasticSearchClient_with_auth_disabled() { + when(openSearchSourceConfiguration.isAuthenticationDisabled()).thenReturn(true); + + when(connectionConfiguration.getCertPath()).thenReturn(null); + when(connectionConfiguration.getSocketTimeout()).thenReturn(null); + when(connectionConfiguration.getConnectTimeout()).thenReturn(null); + when(connectionConfiguration.isInsecure()).thenReturn(true); + + final ElasticsearchClient elasticsearchClient = createObjectUnderTest().provideElasticSearchClient(openSearchSourceConfiguration); + assertThat(elasticsearchClient, notNullValue()); + + verifyNoInteractions(awsCredentialsSupplier); + verify(openSearchSourceConfiguration, never()).getUsername(); + verify(openSearchSourceConfiguration, never()).getPassword(); + } + + @Test + void provideOpenSearchClient_with_auth_disabled() { + when(openSearchSourceConfiguration.isAuthenticationDisabled()).thenReturn(true); + + when(connectionConfiguration.getCertPath()).thenReturn(null); + when(connectionConfiguration.getSocketTimeout()).thenReturn(null); + when(connectionConfiguration.getConnectTimeout()).thenReturn(null); + when(connectionConfiguration.isInsecure()).thenReturn(true); + + final OpenSearchClient openSearchClient = createObjectUnderTest().provideOpenSearchClient(openSearchSourceConfiguration); + assertThat(openSearchClient, notNullValue()); + + verifyNoInteractions(awsCredentialsSupplier); + verify(openSearchSourceConfiguration, never()).getUsername(); + verify(openSearchSourceConfiguration, never()).getPassword(); + } }