Skip to content

Commit

Permalink
Add disable_authentication flag to the opensearch source (#2942)
Browse files Browse the repository at this point in the history
Signed-off-by: Taylor Gray <tylgry@amazon.com>
  • Loading branch information
graytaylor0 committed Jun 29, 2023
1 parent 8e2145c commit 0d29418
Show file tree
Hide file tree
Showing 6 changed files with 124 additions and 46 deletions.
3 changes: 3 additions & 0 deletions data-prepper-plugins/opensearch-source/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,9 @@ opensearch-source-pipeline:
- `password` (Optional) : A String of password used in the internal users of OpenSearch cluster. Default is null.


- `disable_authentication` (Optional) : A boolean that can disable authentication if the cluster supports it. Defaults to false.


- `aws` (Optional) : AWS configurations. See [AWS Configuration](#aws_configuration) for details. SigV4 is enabled by default when this option is used.


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ public OpenSearchSource(final OpenSearchSourceConfiguration openSearchSourceConf
final AwsCredentialsSupplier awsCredentialsSupplier) {
this.openSearchSourceConfiguration = openSearchSourceConfiguration;
this.awsCredentialsSupplier = awsCredentialsSupplier;

openSearchSourceConfiguration.validateAwsConfigWithUsernameAndPassword();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,8 @@

import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.validation.Valid;
import jakarta.validation.constraints.AssertTrue;
import jakarta.validation.constraints.Min;
import jakarta.validation.constraints.NotNull;
import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException;
import org.opensearch.dataprepper.plugins.source.opensearch.configuration.AwsAuthenticationConfiguration;
import org.opensearch.dataprepper.plugins.source.opensearch.configuration.ConnectionConfiguration;
import org.opensearch.dataprepper.plugins.source.opensearch.configuration.IndexParametersConfiguration;
Expand All @@ -20,13 +19,6 @@

public class OpenSearchSourceConfiguration {

/**
* 0 indicates infinite retries
*/
@JsonProperty("max_retries")
@Min(0)
private Integer maxRetries = 0;

@NotNull
@JsonProperty("hosts")
private List<String> hosts;
Expand All @@ -37,6 +29,9 @@ public class OpenSearchSourceConfiguration {
@JsonProperty("password")
private String password;

@JsonProperty("disable_authentication")
private Boolean disableAuthentication = false;

@JsonProperty("connection")
@Valid
private ConnectionConfiguration connectionConfiguration = new ConnectionConfiguration();
Expand All @@ -57,10 +52,6 @@ public class OpenSearchSourceConfiguration {
@Valid
private SearchConfiguration searchConfiguration = new SearchConfiguration();

public Integer getMaxRetries() {
return maxRetries;
}

public List<String> getHosts() {
return hosts;
}
Expand All @@ -73,6 +64,8 @@ public String getPassword() {
return password;
}

public Boolean isAuthenticationDisabled() { return disableAuthentication; }

public ConnectionConfiguration getConnectionConfiguration() {
return connectionConfiguration;
}
Expand All @@ -93,10 +86,13 @@ public SearchConfiguration getSearchConfiguration() {
return searchConfiguration;
}

@AssertTrue(message = "Either username and password, or aws options must be specified. Both cannot be set at once.")
boolean validateAwsConfigWithUsernameAndPassword() {
return !((Objects.nonNull(awsAuthenticationOptions) && (Objects.nonNull(username) || Objects.nonNull(password))) ||
(Objects.isNull(awsAuthenticationOptions) && (Objects.isNull(username) || Objects.isNull(password))));
void validateAwsConfigWithUsernameAndPassword() {

if (((Objects.nonNull(awsAuthenticationOptions) && ((Objects.nonNull(username) || Objects.nonNull(password)) || disableAuthentication)) ||
(Objects.nonNull(username) || Objects.nonNull(password)) && disableAuthentication) ||
(Objects.isNull(awsAuthenticationOptions) && (Objects.isNull(username) || Objects.isNull(password)) && !disableAuthentication)) {
throw new InvalidPluginConfigurationException("Either username and password, or aws options must be specified. Both cannot be set at once. Authentication can be disabled by setting the disable_authentication flag to true.");
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,7 @@ private RestClient createOpenSearchRestClient(final OpenSearchSourceConfiguratio

final RestClientBuilder restClientBuilder = RestClient.builder(httpHosts);

LOG.info("Using username and password for auth for the OpenSearch source");
attachUsernamePassword(restClientBuilder, openSearchSourceConfiguration);
attachBasicAuth(restClientBuilder, openSearchSourceConfiguration);

setConnectAndSocketTimeout(restClientBuilder, openSearchSourceConfiguration);

Expand All @@ -161,33 +160,36 @@ private org.elasticsearch.client.RestClient createElasticSearchRestClient(final
new BasicHeader("Content-type", "application/json")
});

LOG.info("Using username and password for auth for the OpenSearch source");
attachUsernamePassword(restClientBuilder, openSearchSourceConfiguration);

attachBasicAuth(restClientBuilder, openSearchSourceConfiguration);
setConnectAndSocketTimeout(restClientBuilder, openSearchSourceConfiguration);

return restClientBuilder.build();
}

private void attachUsernamePassword(final RestClientBuilder restClientBuilder, final OpenSearchSourceConfiguration openSearchSourceConfiguration) {
final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY,
new UsernamePasswordCredentials(openSearchSourceConfiguration.getUsername(), openSearchSourceConfiguration.getPassword()));
private void attachBasicAuth(final RestClientBuilder restClientBuilder, final OpenSearchSourceConfiguration openSearchSourceConfiguration) {

restClientBuilder.setHttpClientConfigCallback(httpClientBuilder -> {
httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
if (!openSearchSourceConfiguration.isAuthenticationDisabled()) {
attachUsernameAndPassword(httpClientBuilder, openSearchSourceConfiguration);
} else {
LOG.warn("Authentication was explicitly disabled for the OpenSearch source");
}

attachSSLContext(httpClientBuilder, openSearchSourceConfiguration);
return httpClientBuilder;
});
}

private void attachUsernamePassword(final org.elasticsearch.client.RestClientBuilder restClientBuilder, final OpenSearchSourceConfiguration openSearchSourceConfiguration) {
final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY,
new UsernamePasswordCredentials(openSearchSourceConfiguration.getUsername(), openSearchSourceConfiguration.getPassword()));
private void attachBasicAuth(final org.elasticsearch.client.RestClientBuilder restClientBuilder, final OpenSearchSourceConfiguration openSearchSourceConfiguration) {

restClientBuilder.setHttpClientConfigCallback(httpClientBuilder -> {
httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);

if (!openSearchSourceConfiguration.isAuthenticationDisabled()) {
attachUsernameAndPassword(httpClientBuilder, openSearchSourceConfiguration);
} else {
LOG.warn("Authentication was explicitly disabled for the OpenSearch source");
}

attachSSLContext(httpClientBuilder, openSearchSourceConfiguration);
httpClientBuilder.addInterceptorLast(
(HttpResponseInterceptor)
Expand All @@ -211,6 +213,15 @@ private void setConnectAndSocketTimeout(final RestClientBuilder restClientBuilde
});
}

private void attachUsernameAndPassword(final HttpAsyncClientBuilder httpClientBuilder, final OpenSearchSourceConfiguration openSearchSourceConfiguration) {
LOG.info("Using username and password for auth for the OpenSearch source");

final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY,
new UsernamePasswordCredentials(openSearchSourceConfiguration.getUsername(), openSearchSourceConfiguration.getPassword()));
httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
}

private void setConnectAndSocketTimeout(final org.elasticsearch.client.RestClientBuilder restClientBuilder, final OpenSearchSourceConfiguration openSearchSourceConfiguration) {
restClientBuilder.setRequestConfigCallback(requestConfigBuilder -> {
if (Objects.nonNull(openSearchSourceConfiguration.getConnectionConfiguration().getConnectTimeout())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,12 @@
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
import com.fasterxml.jackson.dataformat.yaml.YAMLGenerator;
import org.junit.jupiter.api.Test;
import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.notNullValue;
import static org.junit.jupiter.api.Assertions.assertThrows;

public class OpenSearchSourceConfigurationTest {

Expand All @@ -21,7 +23,7 @@ public class OpenSearchSourceConfigurationTest {
@Test
void open_search_source_username_password_only() throws JsonProcessingException {

final String sourceConfigurationYaml = "max_retries: 5\n" +
final String sourceConfigurationYaml =
"hosts: [\"http://localhost:9200\"]\n" +
"username: test\n" +
"password: test\n" +
Expand All @@ -44,18 +46,49 @@ void open_search_source_username_password_only() throws JsonProcessingException
assertThat(sourceConfiguration.getIndexParametersConfiguration(), notNullValue());
assertThat(sourceConfiguration.getSchedulingParameterConfiguration(), notNullValue());
assertThat(sourceConfiguration.getHosts(), notNullValue());
assertThat(sourceConfiguration.getMaxRetries(), equalTo(5));

assertThat(sourceConfiguration.validateAwsConfigWithUsernameAndPassword(), equalTo(true));
sourceConfiguration.validateAwsConfigWithUsernameAndPassword();
assertThat(sourceConfiguration.getPassword(), equalTo("test"));
assertThat(sourceConfiguration.getUsername(), equalTo("test"));
assertThat(sourceConfiguration.getAwsAuthenticationOptions(), equalTo(null));
}

@Test
void opensearch_source_aws_only() throws JsonProcessingException {
final String sourceConfigurationYaml = "max_retries: 5\n" +
void open_search_disabled_authentication() throws JsonProcessingException {

final String sourceConfigurationYaml =
"hosts: [\"http://localhost:9200\"]\n" +
"disable_authentication: true\n" +
"connection:\n" +
" insecure: true\n" +
" cert: \"cert\"\n" +
"indices:\n" +
" include:\n" +
" - index_name_regex: \"regex\"\n" +
" - index_name_regex: \"regex-two\"\n" +
"scheduling:\n" +
" job_count: 3\n" +
"search_options:\n" +
" batch_size: 1000\n" +
" query: \"test\"\n";
final OpenSearchSourceConfiguration sourceConfiguration = objectMapper.readValue(sourceConfigurationYaml, OpenSearchSourceConfiguration.class);

assertThat(sourceConfiguration.getSearchConfiguration(), notNullValue());
assertThat(sourceConfiguration.getConnectionConfiguration(), notNullValue());
assertThat(sourceConfiguration.getIndexParametersConfiguration(), notNullValue());
assertThat(sourceConfiguration.getSchedulingParameterConfiguration(), notNullValue());
assertThat(sourceConfiguration.getHosts(), notNullValue());

sourceConfiguration.validateAwsConfigWithUsernameAndPassword();
assertThat(sourceConfiguration.isAuthenticationDisabled(), equalTo(true));
assertThat(sourceConfiguration.getPassword(), equalTo(null));
assertThat(sourceConfiguration.getUsername(), equalTo(null));
assertThat(sourceConfiguration.getAwsAuthenticationOptions(), equalTo(null));
}

@Test
void opensearch_source_aws_only() throws JsonProcessingException {
final String sourceConfigurationYaml = "hosts: [\"http://localhost:9200\"]\n" +
"connection:\n" +
" insecure: true\n" +
" cert: \"cert\"\n" +
Expand All @@ -74,7 +107,7 @@ void opensearch_source_aws_only() throws JsonProcessingException {

final OpenSearchSourceConfiguration sourceConfiguration = objectMapper.readValue(sourceConfigurationYaml, OpenSearchSourceConfiguration.class);

assertThat(sourceConfiguration.validateAwsConfigWithUsernameAndPassword(), equalTo(true));
sourceConfiguration.validateAwsConfigWithUsernameAndPassword();
assertThat(sourceConfiguration.getPassword(), equalTo(null));
assertThat(sourceConfiguration.getUsername(), equalTo(null));
assertThat(sourceConfiguration.getAwsAuthenticationOptions(), notNullValue());
Expand All @@ -85,8 +118,7 @@ void opensearch_source_aws_only() throws JsonProcessingException {

@Test
void opensearch_source_aws_sts_external_id() throws JsonProcessingException {
final String sourceConfigurationYaml = "max_retries: 5\n" +
"hosts: [\"http://localhost:9200\"]\n" +
final String sourceConfigurationYaml = "hosts: [\"http://localhost:9200\"]\n" +
"connection:\n" +
" insecure: true\n" +
" cert: \"cert\"\n" +
Expand All @@ -106,7 +138,7 @@ void opensearch_source_aws_sts_external_id() throws JsonProcessingException {

final OpenSearchSourceConfiguration sourceConfiguration = objectMapper.readValue(sourceConfigurationYaml, OpenSearchSourceConfiguration.class);

assertThat(sourceConfiguration.validateAwsConfigWithUsernameAndPassword(), equalTo(true));
sourceConfiguration.validateAwsConfigWithUsernameAndPassword();
assertThat(sourceConfiguration.getPassword(), equalTo(null));
assertThat(sourceConfiguration.getUsername(), equalTo(null));
assertThat(sourceConfiguration.getAwsAuthenticationOptions(), notNullValue());
Expand Down Expand Up @@ -141,14 +173,12 @@ void using_both_aws_config_and_username_password_is_invalid() throws JsonProcess

final OpenSearchSourceConfiguration sourceConfiguration = objectMapper.readValue(sourceConfigurationYaml, OpenSearchSourceConfiguration.class);

assertThat(sourceConfiguration.validateAwsConfigWithUsernameAndPassword(), equalTo(false));
assertThat(sourceConfiguration.getMaxRetries(), equalTo(0));
assertThrows(InvalidPluginConfigurationException.class, sourceConfiguration::validateAwsConfigWithUsernameAndPassword);
}

@Test
void one_of_username_password_or_aws_config_is_required() throws JsonProcessingException {
void one_of_username_password_or_aws_config_or_authDisabled_is_required() throws JsonProcessingException {
final String sourceConfigurationYaml =
"max_retries: 5\n" +
"hosts: [\"http://localhost:9200\"]\n" +
"connection:\n" +
" insecure: true\n" +
Expand All @@ -165,6 +195,6 @@ void one_of_username_password_or_aws_config_is_required() throws JsonProcessingE

final OpenSearchSourceConfiguration sourceConfiguration = objectMapper.readValue(sourceConfigurationYaml, OpenSearchSourceConfiguration.class);

assertThat(sourceConfiguration.validateAwsConfigWithUsernameAndPassword(), equalTo(false));
assertThrows(InvalidPluginConfigurationException.class, sourceConfiguration::validateAwsConfigWithUsernameAndPassword);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.notNullValue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;
import static org.mockito.Mockito.when;

Expand Down Expand Up @@ -119,4 +121,38 @@ void provideOpenSearchClient_with_aws_auth() {
assertThat(awsCredentialsOptions.getStsHeaderOverrides(), equalTo(Collections.emptyMap()));
assertThat(awsCredentialsOptions.getStsRoleArn(), equalTo(stsRoleArn));
}

@Test
void provideElasticSearchClient_with_auth_disabled() {
when(openSearchSourceConfiguration.isAuthenticationDisabled()).thenReturn(true);

when(connectionConfiguration.getCertPath()).thenReturn(null);
when(connectionConfiguration.getSocketTimeout()).thenReturn(null);
when(connectionConfiguration.getConnectTimeout()).thenReturn(null);
when(connectionConfiguration.isInsecure()).thenReturn(true);

final ElasticsearchClient elasticsearchClient = createObjectUnderTest().provideElasticSearchClient(openSearchSourceConfiguration);
assertThat(elasticsearchClient, notNullValue());

verifyNoInteractions(awsCredentialsSupplier);
verify(openSearchSourceConfiguration, never()).getUsername();
verify(openSearchSourceConfiguration, never()).getPassword();
}

@Test
void provideOpenSearchClient_with_auth_disabled() {
when(openSearchSourceConfiguration.isAuthenticationDisabled()).thenReturn(true);

when(connectionConfiguration.getCertPath()).thenReturn(null);
when(connectionConfiguration.getSocketTimeout()).thenReturn(null);
when(connectionConfiguration.getConnectTimeout()).thenReturn(null);
when(connectionConfiguration.isInsecure()).thenReturn(true);

final OpenSearchClient openSearchClient = createObjectUnderTest().provideOpenSearchClient(openSearchSourceConfiguration);
assertThat(openSearchClient, notNullValue());

verifyNoInteractions(awsCredentialsSupplier);
verify(openSearchSourceConfiguration, never()).getUsername();
verify(openSearchSourceConfiguration, never()).getPassword();
}
}

0 comments on commit 0d29418

Please sign in to comment.