Skip to content

Commit

Permalink
ENH: automatic credential refresher opensearch sink (#4283)
Browse files Browse the repository at this point in the history
ENH: automatic credential refresher opensearch sink

Signed-off-by: George Chen <qchea@amazon.com>
  • Loading branch information
chenqi0805 authored Mar 28, 2024
1 parent 6765098 commit 5a7dbda
Show file tree
Hide file tree
Showing 11 changed files with 310 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.opensearch.dataprepper.model.event.EventType;
import org.opensearch.dataprepper.model.event.JacksonEvent;
import org.opensearch.dataprepper.model.opensearch.OpenSearchBulkActions;
import org.opensearch.dataprepper.model.plugin.PluginConfigObservable;
import org.opensearch.dataprepper.model.plugin.PluginFactory;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.sink.SinkContext;
Expand Down Expand Up @@ -92,6 +93,7 @@
import static org.junit.jupiter.params.provider.Arguments.arguments;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.opensearch.dataprepper.plugins.sink.opensearch.OpenSearchIntegrationHelper.createContentParser;
import static org.opensearch.dataprepper.plugins.sink.opensearch.OpenSearchIntegrationHelper.createOpenSearchClient;
Expand Down Expand Up @@ -131,8 +133,12 @@ public class OpenSearchSinkIT {
@Mock
private ExpressionEvaluator expressionEvaluator;

@Mock
private PluginConfigObservable pluginConfigObservable;

public OpenSearchSink createObjectUnderTest(PluginSetting pluginSetting, boolean doInitialize) {
OpenSearchSink sink = new OpenSearchSink(pluginSetting, pluginFactory, null, expressionEvaluator, awsCredentialsSupplier);
OpenSearchSink sink = new OpenSearchSink(
pluginSetting, pluginFactory, null, expressionEvaluator, awsCredentialsSupplier, pluginConfigObservable);
if (doInitialize) {
sink.doInitialize();
}
Expand All @@ -143,7 +149,8 @@ public OpenSearchSink createObjectUnderTestWithSinkContext(PluginSetting pluginS
sinkContext = mock(SinkContext.class);
testTagsTargetKey = RandomStringUtils.randomAlphabetic(5);
when(sinkContext.getTagsTargetKey()).thenReturn(testTagsTargetKey);
OpenSearchSink sink = new OpenSearchSink(pluginSetting, pluginFactory, sinkContext, expressionEvaluator, awsCredentialsSupplier);
OpenSearchSink sink = new OpenSearchSink(
pluginSetting, pluginFactory, sinkContext, expressionEvaluator, awsCredentialsSupplier, pluginConfigObservable);
if (doInitialize) {
sink.doInitialize();
}
Expand All @@ -152,7 +159,7 @@ public OpenSearchSink createObjectUnderTestWithSinkContext(PluginSetting pluginS

@BeforeEach
public void setup() {

pluginConfigObservable = mock(PluginConfigObservable.class);
expressionEvaluator = mock(ExpressionEvaluator.class);
when(expressionEvaluator.isValidExpressionStatement(any(String.class))).thenReturn(false);

Expand Down Expand Up @@ -1140,6 +1147,7 @@ public void testEventOutput() throws IOException, InterruptedException {

final PluginSetting pluginSetting = generatePluginSetting(IndexType.TRACE_ANALYTICS_RAW.getValue(), null, null);
final OpenSearchSink sink = createObjectUnderTest(pluginSetting, true);
verify(pluginConfigObservable).addPluginConfigObserver(any());
sink.output(testRecords);

final String expIndexAlias = IndexConstants.TYPE_TO_DEFAULT_ALIAS.get(IndexType.TRACE_ANALYTICS_RAW);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package org.opensearch.dataprepper.plugins.sink.opensearch;

import io.micrometer.core.instrument.Counter;
import org.opensearch.client.opensearch.OpenSearchClient;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.configuration.PluginSetting;
import org.opensearch.dataprepper.model.plugin.PluginComponentRefresher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Objects;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Function;

public class OpenSearchClientRefresher implements PluginComponentRefresher<OpenSearchClient, PluginSetting> {
static final String CREDENTIALS_CHANGED = "credentialsChanged";
static final String CLIENT_REFRESH_ERRORS = "clientRefreshErrors";
private static final Logger LOG = LoggerFactory.getLogger(OpenSearchClientRefresher.class);
private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
private final Function<ConnectionConfiguration, OpenSearchClient> clientFunction;
private OpenSearchClient currentClient;
private ConnectionConfiguration currentConfig;

private final Counter credentialsChangeCounter;
private final Counter clientRefreshErrorsCounter;

public OpenSearchClientRefresher(final PluginMetrics pluginMetrics,
final ConnectionConfiguration connectionConfiguration,
final Function<ConnectionConfiguration, OpenSearchClient> clientFunction) {
this.clientFunction = clientFunction;
this.currentConfig = connectionConfiguration;
this.currentClient = clientFunction.apply(connectionConfiguration);
credentialsChangeCounter = pluginMetrics.counter(CREDENTIALS_CHANGED);
clientRefreshErrorsCounter = pluginMetrics.counter(CLIENT_REFRESH_ERRORS);
}

@Override
public Class<OpenSearchClient> getComponentClass() {
return OpenSearchClient.class;
}

@Override
public OpenSearchClient get() {
readWriteLock.readLock().lock();
try {
return currentClient;
} finally {
readWriteLock.readLock().unlock();
}
}

@Override
public void update(PluginSetting pluginSetting) {
final ConnectionConfiguration newConfig = ConnectionConfiguration.readConnectionConfiguration(pluginSetting);
if (basicAuthChanged(newConfig)) {
credentialsChangeCounter.increment();
readWriteLock.writeLock().lock();
try {
currentClient = clientFunction.apply(newConfig);
currentConfig = newConfig;
} catch (Exception e) {
clientRefreshErrorsCounter.increment();
LOG.error("Refreshing {} failed.", getComponentClass(), e);
} finally {
readWriteLock.writeLock().unlock();
}
}
}

private boolean basicAuthChanged(final ConnectionConfiguration newConfig) {
return !Objects.equals(currentConfig.getUsername(), newConfig.getUsername()) ||
!Objects.equals(currentConfig.getPassword(), newConfig.getPassword());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.opensearch.dataprepper.model.failures.DlqObject;
import org.opensearch.dataprepper.model.opensearch.OpenSearchBulkActions;
import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException;
import org.opensearch.dataprepper.model.plugin.PluginConfigObservable;
import org.opensearch.dataprepper.model.plugin.PluginFactory;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.sink.AbstractSink;
Expand Down Expand Up @@ -81,6 +82,7 @@
import java.util.StringJoiner;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -128,6 +130,7 @@ public class OpenSearchSink extends AbstractSink<Record<Event>> {
private final DistributionSummary bulkRequestSizeBytesSummary;
private final Counter dynamicDocumentVersionDroppedEvents;
private OpenSearchClient openSearchClient;
private OpenSearchClientRefresher openSearchClientRefresher;
private ObjectMapper objectMapper;
private volatile boolean initialized;
private PluginSetting pluginSetting;
Expand All @@ -139,13 +142,15 @@ public class OpenSearchSink extends AbstractSink<Record<Event>> {
private DlqProvider dlqProvider;
private final ConcurrentHashMap<Long, AccumulatingBulkRequest<BulkOperationWrapper, BulkRequest>> bulkRequestMap;
private final ConcurrentHashMap<Long, Long> lastFlushTimeMap;
private final PluginConfigObservable pluginConfigObservable;

@DataPrepperPluginConstructor
public OpenSearchSink(final PluginSetting pluginSetting,
final PluginFactory pluginFactory,
final SinkContext sinkContext,
final ExpressionEvaluator expressionEvaluator,
final AwsCredentialsSupplier awsCredentialsSupplier) {
final AwsCredentialsSupplier awsCredentialsSupplier,
final PluginConfigObservable pluginConfigObservable) {
super(pluginSetting, Integer.MAX_VALUE, INITIALIZE_RETRY_WAIT_TIME_MS);
this.awsCredentialsSupplier = awsCredentialsSupplier;
this.sinkContext = sinkContext != null ? sinkContext : new SinkContext(null, Collections.emptyList(), Collections.emptyList(), Collections.emptyList());
Expand Down Expand Up @@ -178,6 +183,7 @@ public OpenSearchSink(final PluginSetting pluginSetting,
this.pluginSetting = pluginSetting;
this.bulkRequestMap = new ConcurrentHashMap<>();
this.lastFlushTimeMap = new ConcurrentHashMap<>();
this.pluginConfigObservable = pluginConfigObservable;

final Optional<PluginModel> dlqConfig = openSearchSinkConfig.getRetryConfiguration().getDlq();
if (dlqConfig.isPresent()) {
Expand Down Expand Up @@ -210,8 +216,21 @@ public void doInitialize() {

private void doInitializeInternal() throws IOException {
LOG.info("Initializing OpenSearch sink");
restHighLevelClient = openSearchSinkConfig.getConnectionConfiguration().createClient(awsCredentialsSupplier);
openSearchClient = openSearchSinkConfig.getConnectionConfiguration().createOpenSearchClient(restHighLevelClient, awsCredentialsSupplier);
final ConnectionConfiguration connectionConfiguration = openSearchSinkConfig.getConnectionConfiguration();
restHighLevelClient = connectionConfiguration.createClient(awsCredentialsSupplier);
openSearchClient = connectionConfiguration.createOpenSearchClient(restHighLevelClient, awsCredentialsSupplier);
final Function<ConnectionConfiguration, OpenSearchClient> clientFunction =
(connectionConfiguration1) -> {
final RestHighLevelClient restHighLevelClient1 = connectionConfiguration1.createClient(awsCredentialsSupplier);
return connectionConfiguration1.createOpenSearchClient(restHighLevelClient1, awsCredentialsSupplier).withTransportOptions(
TransportOptions.builder()
.setParameter("filter_path", "errors,took,items.*.error,items.*.status,items.*._index,items.*._id")
.build());
};
openSearchClientRefresher = new OpenSearchClientRefresher(
pluginMetrics, connectionConfiguration, clientFunction);
pluginConfigObservable.addPluginConfigObserver(
newPluginSetting -> openSearchClientRefresher.update((PluginSetting) newPluginSetting));
configuredIndexAlias = openSearchSinkConfig.getIndexConfiguration().getIndexAlias();
final IndexTemplateAPIWrapper indexTemplateAPIWrapper = IndexTemplateAPIWrapperFactory.getWrapper(
openSearchSinkConfig.getIndexConfiguration(), openSearchClient);
Expand Down Expand Up @@ -249,11 +268,8 @@ private void doInitializeInternal() throws IOException {
}

final int maxRetries = openSearchSinkConfig.getRetryConfiguration().getMaxRetries();
final OpenSearchClient filteringOpenSearchClient = openSearchClient.withTransportOptions(
TransportOptions.builder()
.setParameter("filter_path", "errors,took,items.*.error,items.*.status,items.*._index,items.*._id")
.build());
bulkApiWrapper = BulkApiWrapperFactory.getWrapper(openSearchSinkConfig.getIndexConfiguration(), filteringOpenSearchClient);
bulkApiWrapper = BulkApiWrapperFactory.getWrapper(openSearchSinkConfig.getIndexConfiguration(),
() -> openSearchClientRefresher.get());
bulkRetryStrategy = new BulkRetryStrategy(bulkRequest -> bulkApiWrapper.bulk(bulkRequest.getRequest()),
this::logFailureForBulkRequests,
pluginMetrics,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@
import org.opensearch.dataprepper.plugins.sink.opensearch.DistributionVersion;
import org.opensearch.dataprepper.plugins.sink.opensearch.index.IndexConfiguration;

import java.util.function.Supplier;

public class BulkApiWrapperFactory {
public static BulkApiWrapper getWrapper(final IndexConfiguration indexConfiguration,
final OpenSearchClient openSearchClient) {
final Supplier<OpenSearchClient> openSearchClientSupplier) {
if (DistributionVersion.ES6.equals(indexConfiguration.getDistributionVersion())) {
return new Es6BulkApiWrapper(openSearchClient);
return new Es6BulkApiWrapper(openSearchClientSupplier);
} else {
return new OpenSearchDefaultBulkApiWrapper(openSearchClient);
return new OpenSearchDefaultBulkApiWrapper(openSearchClientSupplier);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,20 @@
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Supplier;
import java.util.stream.Collectors;

public class Es6BulkApiWrapper implements BulkApiWrapper {
private final OpenSearchClient openSearchClient;
private final Supplier<OpenSearchClient> openSearchClientSupplier;

public Es6BulkApiWrapper(final OpenSearchClient openSearchClient) {
this.openSearchClient = openSearchClient;
public Es6BulkApiWrapper(final Supplier<OpenSearchClient> openSearchClientSupplier) {
this.openSearchClientSupplier = openSearchClientSupplier;
}

@Override
public BulkResponse bulk(BulkRequest request) throws IOException, OpenSearchException {
final JsonEndpoint<BulkRequest, BulkResponse, ErrorResponse> endpoint = es6BulkEndpoint(request);
final OpenSearchClient openSearchClient = openSearchClientSupplier.get();
return openSearchClient._transport().performRequest(request, endpoint, openSearchClient._transportOptions());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,17 @@
import org.opensearch.client.opensearch.core.BulkResponse;

import java.io.IOException;
import java.util.function.Supplier;

public class OpenSearchDefaultBulkApiWrapper implements BulkApiWrapper {
private final OpenSearchClient openSearchClient;
private final Supplier<OpenSearchClient> openSearchClientSupplier;

public OpenSearchDefaultBulkApiWrapper(final OpenSearchClient openSearchClient) {
this.openSearchClient = openSearchClient;
public OpenSearchDefaultBulkApiWrapper(final Supplier<OpenSearchClient> openSearchClientSupplier) {
this.openSearchClientSupplier = openSearchClientSupplier;
}

@Override
public BulkResponse bulk(BulkRequest request) throws IOException {
return openSearchClient.bulk(request);
return openSearchClientSupplier.get().bulk(request);
}
}
Loading

0 comments on commit 5a7dbda

Please sign in to comment.