Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add disable_authentication flag to the opensearch source #2942

Merged
merged 1 commit into from
Jun 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions data-prepper-plugins/opensearch-source/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,9 @@ opensearch-source-pipeline:
- `password` (Optional) : A String of password used in the internal users of OpenSearch cluster. Default is null.


- `disable_authentication` (Optional) : A boolean that can disable authentication if the cluster supports it. Defaults to false.


- `aws` (Optional) : AWS configurations. See [AWS Configuration](#aws_configuration) for details. SigV4 is enabled by default when this option is used.


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ public OpenSearchSource(final OpenSearchSourceConfiguration openSearchSourceConf
final AwsCredentialsSupplier awsCredentialsSupplier) {
this.openSearchSourceConfiguration = openSearchSourceConfiguration;
this.awsCredentialsSupplier = awsCredentialsSupplier;

openSearchSourceConfiguration.validateAwsConfigWithUsernameAndPassword();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,8 @@

import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.validation.Valid;
import jakarta.validation.constraints.AssertTrue;
import jakarta.validation.constraints.Min;
import jakarta.validation.constraints.NotNull;
import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException;
import org.opensearch.dataprepper.plugins.source.opensearch.configuration.AwsAuthenticationConfiguration;
import org.opensearch.dataprepper.plugins.source.opensearch.configuration.ConnectionConfiguration;
import org.opensearch.dataprepper.plugins.source.opensearch.configuration.IndexParametersConfiguration;
Expand All @@ -20,13 +19,6 @@

public class OpenSearchSourceConfiguration {

/**
* 0 indicates infinite retries
*/
@JsonProperty("max_retries")
@Min(0)
private Integer maxRetries = 0;

@NotNull
@JsonProperty("hosts")
private List<String> hosts;
Expand All @@ -37,6 +29,9 @@ public class OpenSearchSourceConfiguration {
@JsonProperty("password")
private String password;

@JsonProperty("disable_authentication")
private Boolean disableAuthentication = false;

@JsonProperty("connection")
@Valid
private ConnectionConfiguration connectionConfiguration = new ConnectionConfiguration();
Expand All @@ -57,10 +52,6 @@ public class OpenSearchSourceConfiguration {
@Valid
private SearchConfiguration searchConfiguration = new SearchConfiguration();

public Integer getMaxRetries() {
return maxRetries;
}

public List<String> getHosts() {
return hosts;
}
Expand All @@ -73,6 +64,8 @@ public String getPassword() {
return password;
}

public Boolean isAuthenticationDisabled() { return disableAuthentication; }

public ConnectionConfiguration getConnectionConfiguration() {
return connectionConfiguration;
}
Expand All @@ -93,10 +86,13 @@ public SearchConfiguration getSearchConfiguration() {
return searchConfiguration;
}

@AssertTrue(message = "Either username and password, or aws options must be specified. Both cannot be set at once.")
boolean validateAwsConfigWithUsernameAndPassword() {
return !((Objects.nonNull(awsAuthenticationOptions) && (Objects.nonNull(username) || Objects.nonNull(password))) ||
(Objects.isNull(awsAuthenticationOptions) && (Objects.isNull(username) || Objects.isNull(password))));
void validateAwsConfigWithUsernameAndPassword() {

if (((Objects.nonNull(awsAuthenticationOptions) && ((Objects.nonNull(username) || Objects.nonNull(password)) || disableAuthentication)) ||
(Objects.nonNull(username) || Objects.nonNull(password)) && disableAuthentication) ||
(Objects.isNull(awsAuthenticationOptions) && (Objects.isNull(username) || Objects.isNull(password)) && !disableAuthentication)) {
Comment on lines +91 to +93
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor: it's easier to get a sense of the conditionals by breaking this into a few booleans. Something like

final boolean isAwsAuthValid = ..
final boolean is userNamePasswordValid = ..

if (isAwsAuthValid && !userNamePasswordValid ...)

throw new InvalidPluginConfigurationException("Either username and password, or aws options must be specified. Both cannot be set at once. Authentication can be disabled by setting the disable_authentication flag to true.");
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,7 @@ private RestClient createOpenSearchRestClient(final OpenSearchSourceConfiguratio

final RestClientBuilder restClientBuilder = RestClient.builder(httpHosts);

LOG.info("Using username and password for auth for the OpenSearch source");
attachUsernamePassword(restClientBuilder, openSearchSourceConfiguration);
attachBasicAuth(restClientBuilder, openSearchSourceConfiguration);

setConnectAndSocketTimeout(restClientBuilder, openSearchSourceConfiguration);

Expand All @@ -161,33 +160,36 @@ private org.elasticsearch.client.RestClient createElasticSearchRestClient(final
new BasicHeader("Content-type", "application/json")
});

LOG.info("Using username and password for auth for the OpenSearch source");
attachUsernamePassword(restClientBuilder, openSearchSourceConfiguration);

attachBasicAuth(restClientBuilder, openSearchSourceConfiguration);
setConnectAndSocketTimeout(restClientBuilder, openSearchSourceConfiguration);

return restClientBuilder.build();
}

private void attachUsernamePassword(final RestClientBuilder restClientBuilder, final OpenSearchSourceConfiguration openSearchSourceConfiguration) {
final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY,
new UsernamePasswordCredentials(openSearchSourceConfiguration.getUsername(), openSearchSourceConfiguration.getPassword()));
private void attachBasicAuth(final RestClientBuilder restClientBuilder, final OpenSearchSourceConfiguration openSearchSourceConfiguration) {

restClientBuilder.setHttpClientConfigCallback(httpClientBuilder -> {
httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
if (!openSearchSourceConfiguration.isAuthenticationDisabled()) {
attachUsernameAndPassword(httpClientBuilder, openSearchSourceConfiguration);
} else {
LOG.warn("Authentication was explicitly disabled for the OpenSearch source");
}

attachSSLContext(httpClientBuilder, openSearchSourceConfiguration);
return httpClientBuilder;
});
}

private void attachUsernamePassword(final org.elasticsearch.client.RestClientBuilder restClientBuilder, final OpenSearchSourceConfiguration openSearchSourceConfiguration) {
final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY,
new UsernamePasswordCredentials(openSearchSourceConfiguration.getUsername(), openSearchSourceConfiguration.getPassword()));
private void attachBasicAuth(final org.elasticsearch.client.RestClientBuilder restClientBuilder, final OpenSearchSourceConfiguration openSearchSourceConfiguration) {

restClientBuilder.setHttpClientConfigCallback(httpClientBuilder -> {
httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);

if (!openSearchSourceConfiguration.isAuthenticationDisabled()) {
attachUsernameAndPassword(httpClientBuilder, openSearchSourceConfiguration);
} else {
LOG.warn("Authentication was explicitly disabled for the OpenSearch source");
}

attachSSLContext(httpClientBuilder, openSearchSourceConfiguration);
httpClientBuilder.addInterceptorLast(
(HttpResponseInterceptor)
Expand All @@ -211,6 +213,15 @@ private void setConnectAndSocketTimeout(final RestClientBuilder restClientBuilde
});
}

private void attachUsernameAndPassword(final HttpAsyncClientBuilder httpClientBuilder, final OpenSearchSourceConfiguration openSearchSourceConfiguration) {
LOG.info("Using username and password for auth for the OpenSearch source");

final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY,
new UsernamePasswordCredentials(openSearchSourceConfiguration.getUsername(), openSearchSourceConfiguration.getPassword()));
httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
}

private void setConnectAndSocketTimeout(final org.elasticsearch.client.RestClientBuilder restClientBuilder, final OpenSearchSourceConfiguration openSearchSourceConfiguration) {
restClientBuilder.setRequestConfigCallback(requestConfigBuilder -> {
if (Objects.nonNull(openSearchSourceConfiguration.getConnectionConfiguration().getConnectTimeout())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,12 @@
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
import com.fasterxml.jackson.dataformat.yaml.YAMLGenerator;
import org.junit.jupiter.api.Test;
import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.notNullValue;
import static org.junit.jupiter.api.Assertions.assertThrows;

public class OpenSearchSourceConfigurationTest {

Expand All @@ -21,7 +23,7 @@ public class OpenSearchSourceConfigurationTest {
@Test
void open_search_source_username_password_only() throws JsonProcessingException {

final String sourceConfigurationYaml = "max_retries: 5\n" +
final String sourceConfigurationYaml =
"hosts: [\"http://localhost:9200\"]\n" +
"username: test\n" +
"password: test\n" +
Expand All @@ -44,18 +46,49 @@ void open_search_source_username_password_only() throws JsonProcessingException
assertThat(sourceConfiguration.getIndexParametersConfiguration(), notNullValue());
assertThat(sourceConfiguration.getSchedulingParameterConfiguration(), notNullValue());
assertThat(sourceConfiguration.getHosts(), notNullValue());
assertThat(sourceConfiguration.getMaxRetries(), equalTo(5));

assertThat(sourceConfiguration.validateAwsConfigWithUsernameAndPassword(), equalTo(true));
sourceConfiguration.validateAwsConfigWithUsernameAndPassword();
assertThat(sourceConfiguration.getPassword(), equalTo("test"));
assertThat(sourceConfiguration.getUsername(), equalTo("test"));
assertThat(sourceConfiguration.getAwsAuthenticationOptions(), equalTo(null));
}

@Test
void opensearch_source_aws_only() throws JsonProcessingException {
final String sourceConfigurationYaml = "max_retries: 5\n" +
void open_search_disabled_authentication() throws JsonProcessingException {

final String sourceConfigurationYaml =
"hosts: [\"http://localhost:9200\"]\n" +
"disable_authentication: true\n" +
"connection:\n" +
" insecure: true\n" +
" cert: \"cert\"\n" +
"indices:\n" +
" include:\n" +
" - index_name_regex: \"regex\"\n" +
" - index_name_regex: \"regex-two\"\n" +
"scheduling:\n" +
" job_count: 3\n" +
"search_options:\n" +
" batch_size: 1000\n" +
" query: \"test\"\n";
final OpenSearchSourceConfiguration sourceConfiguration = objectMapper.readValue(sourceConfigurationYaml, OpenSearchSourceConfiguration.class);

assertThat(sourceConfiguration.getSearchConfiguration(), notNullValue());
assertThat(sourceConfiguration.getConnectionConfiguration(), notNullValue());
assertThat(sourceConfiguration.getIndexParametersConfiguration(), notNullValue());
assertThat(sourceConfiguration.getSchedulingParameterConfiguration(), notNullValue());
assertThat(sourceConfiguration.getHosts(), notNullValue());

sourceConfiguration.validateAwsConfigWithUsernameAndPassword();
assertThat(sourceConfiguration.isAuthenticationDisabled(), equalTo(true));
assertThat(sourceConfiguration.getPassword(), equalTo(null));
assertThat(sourceConfiguration.getUsername(), equalTo(null));
assertThat(sourceConfiguration.getAwsAuthenticationOptions(), equalTo(null));
}

@Test
void opensearch_source_aws_only() throws JsonProcessingException {
final String sourceConfigurationYaml = "hosts: [\"http://localhost:9200\"]\n" +
"connection:\n" +
" insecure: true\n" +
" cert: \"cert\"\n" +
Expand All @@ -74,7 +107,7 @@ void opensearch_source_aws_only() throws JsonProcessingException {

final OpenSearchSourceConfiguration sourceConfiguration = objectMapper.readValue(sourceConfigurationYaml, OpenSearchSourceConfiguration.class);

assertThat(sourceConfiguration.validateAwsConfigWithUsernameAndPassword(), equalTo(true));
sourceConfiguration.validateAwsConfigWithUsernameAndPassword();
assertThat(sourceConfiguration.getPassword(), equalTo(null));
assertThat(sourceConfiguration.getUsername(), equalTo(null));
assertThat(sourceConfiguration.getAwsAuthenticationOptions(), notNullValue());
Expand All @@ -85,8 +118,7 @@ void opensearch_source_aws_only() throws JsonProcessingException {

@Test
void opensearch_source_aws_sts_external_id() throws JsonProcessingException {
final String sourceConfigurationYaml = "max_retries: 5\n" +
"hosts: [\"http://localhost:9200\"]\n" +
final String sourceConfigurationYaml = "hosts: [\"http://localhost:9200\"]\n" +
"connection:\n" +
" insecure: true\n" +
" cert: \"cert\"\n" +
Expand All @@ -106,7 +138,7 @@ void opensearch_source_aws_sts_external_id() throws JsonProcessingException {

final OpenSearchSourceConfiguration sourceConfiguration = objectMapper.readValue(sourceConfigurationYaml, OpenSearchSourceConfiguration.class);

assertThat(sourceConfiguration.validateAwsConfigWithUsernameAndPassword(), equalTo(true));
sourceConfiguration.validateAwsConfigWithUsernameAndPassword();
assertThat(sourceConfiguration.getPassword(), equalTo(null));
assertThat(sourceConfiguration.getUsername(), equalTo(null));
assertThat(sourceConfiguration.getAwsAuthenticationOptions(), notNullValue());
Expand Down Expand Up @@ -141,14 +173,12 @@ void using_both_aws_config_and_username_password_is_invalid() throws JsonProcess

final OpenSearchSourceConfiguration sourceConfiguration = objectMapper.readValue(sourceConfigurationYaml, OpenSearchSourceConfiguration.class);

assertThat(sourceConfiguration.validateAwsConfigWithUsernameAndPassword(), equalTo(false));
assertThat(sourceConfiguration.getMaxRetries(), equalTo(0));
assertThrows(InvalidPluginConfigurationException.class, sourceConfiguration::validateAwsConfigWithUsernameAndPassword);
}

@Test
void one_of_username_password_or_aws_config_is_required() throws JsonProcessingException {
void one_of_username_password_or_aws_config_or_authDisabled_is_required() throws JsonProcessingException {
final String sourceConfigurationYaml =
"max_retries: 5\n" +
"hosts: [\"http://localhost:9200\"]\n" +
"connection:\n" +
" insecure: true\n" +
Expand All @@ -165,6 +195,6 @@ void one_of_username_password_or_aws_config_is_required() throws JsonProcessingE

final OpenSearchSourceConfiguration sourceConfiguration = objectMapper.readValue(sourceConfigurationYaml, OpenSearchSourceConfiguration.class);

assertThat(sourceConfiguration.validateAwsConfigWithUsernameAndPassword(), equalTo(false));
assertThrows(InvalidPluginConfigurationException.class, sourceConfiguration::validateAwsConfigWithUsernameAndPassword);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.notNullValue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;
import static org.mockito.Mockito.when;

Expand Down Expand Up @@ -119,4 +121,38 @@ void provideOpenSearchClient_with_aws_auth() {
assertThat(awsCredentialsOptions.getStsHeaderOverrides(), equalTo(Collections.emptyMap()));
assertThat(awsCredentialsOptions.getStsRoleArn(), equalTo(stsRoleArn));
}

@Test
void provideElasticSearchClient_with_auth_disabled() {
when(openSearchSourceConfiguration.isAuthenticationDisabled()).thenReturn(true);

when(connectionConfiguration.getCertPath()).thenReturn(null);
when(connectionConfiguration.getSocketTimeout()).thenReturn(null);
when(connectionConfiguration.getConnectTimeout()).thenReturn(null);
when(connectionConfiguration.isInsecure()).thenReturn(true);

final ElasticsearchClient elasticsearchClient = createObjectUnderTest().provideElasticSearchClient(openSearchSourceConfiguration);
assertThat(elasticsearchClient, notNullValue());

verifyNoInteractions(awsCredentialsSupplier);
verify(openSearchSourceConfiguration, never()).getUsername();
verify(openSearchSourceConfiguration, never()).getPassword();
}

@Test
void provideOpenSearchClient_with_auth_disabled() {
when(openSearchSourceConfiguration.isAuthenticationDisabled()).thenReturn(true);

when(connectionConfiguration.getCertPath()).thenReturn(null);
when(connectionConfiguration.getSocketTimeout()).thenReturn(null);
when(connectionConfiguration.getConnectTimeout()).thenReturn(null);
when(connectionConfiguration.isInsecure()).thenReturn(true);

final OpenSearchClient openSearchClient = createObjectUnderTest().provideOpenSearchClient(openSearchSourceConfiguration);
assertThat(openSearchClient, notNullValue());

verifyNoInteractions(awsCredentialsSupplier);
verify(openSearchSourceConfiguration, never()).getUsername();
verify(openSearchSourceConfiguration, never()).getPassword();
}
}