diff --git a/data-prepper-plugins/opensearch/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkIT.java b/data-prepper-plugins/opensearch/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkIT.java index a9d91e78f0..8b376f16af 100644 --- a/data-prepper-plugins/opensearch/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkIT.java +++ b/data-prepper-plugins/opensearch/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkIT.java @@ -43,6 +43,7 @@ import org.opensearch.dataprepper.model.event.EventType; import org.opensearch.dataprepper.model.event.JacksonEvent; import org.opensearch.dataprepper.model.opensearch.OpenSearchBulkActions; +import org.opensearch.dataprepper.model.plugin.PluginConfigObservable; import org.opensearch.dataprepper.model.plugin.PluginFactory; import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.model.sink.SinkContext; @@ -92,6 +93,7 @@ import static org.junit.jupiter.params.provider.Arguments.arguments; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import static org.opensearch.dataprepper.plugins.sink.opensearch.OpenSearchIntegrationHelper.createContentParser; import static org.opensearch.dataprepper.plugins.sink.opensearch.OpenSearchIntegrationHelper.createOpenSearchClient; @@ -131,8 +133,12 @@ public class OpenSearchSinkIT { @Mock private ExpressionEvaluator expressionEvaluator; + @Mock + private PluginConfigObservable pluginConfigObservable; + public OpenSearchSink createObjectUnderTest(PluginSetting pluginSetting, boolean doInitialize) { - OpenSearchSink sink = new OpenSearchSink(pluginSetting, pluginFactory, null, expressionEvaluator, awsCredentialsSupplier); + OpenSearchSink sink = new OpenSearchSink( + pluginSetting, pluginFactory, null, expressionEvaluator, awsCredentialsSupplier, pluginConfigObservable); if (doInitialize) { sink.doInitialize(); } @@ -143,7 +149,8 @@ public OpenSearchSink createObjectUnderTestWithSinkContext(PluginSetting pluginS sinkContext = mock(SinkContext.class); testTagsTargetKey = RandomStringUtils.randomAlphabetic(5); when(sinkContext.getTagsTargetKey()).thenReturn(testTagsTargetKey); - OpenSearchSink sink = new OpenSearchSink(pluginSetting, pluginFactory, sinkContext, expressionEvaluator, awsCredentialsSupplier); + OpenSearchSink sink = new OpenSearchSink( + pluginSetting, pluginFactory, sinkContext, expressionEvaluator, awsCredentialsSupplier, pluginConfigObservable); if (doInitialize) { sink.doInitialize(); } @@ -152,7 +159,7 @@ public OpenSearchSink createObjectUnderTestWithSinkContext(PluginSetting pluginS @BeforeEach public void setup() { - + pluginConfigObservable = mock(PluginConfigObservable.class); expressionEvaluator = mock(ExpressionEvaluator.class); when(expressionEvaluator.isValidExpressionStatement(any(String.class))).thenReturn(false); @@ -1140,6 +1147,7 @@ public void testEventOutput() throws IOException, InterruptedException { final PluginSetting pluginSetting = generatePluginSetting(IndexType.TRACE_ANALYTICS_RAW.getValue(), null, null); final OpenSearchSink sink = createObjectUnderTest(pluginSetting, true); + verify(pluginConfigObservable).addPluginConfigObserver(any()); sink.output(testRecords); final String expIndexAlias = IndexConstants.TYPE_TO_DEFAULT_ALIAS.get(IndexType.TRACE_ANALYTICS_RAW); diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchClientRefresher.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchClientRefresher.java new file mode 100644 index 0000000000..7db4e22fc5 --- /dev/null +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchClientRefresher.java @@ -0,0 +1,75 @@ +package org.opensearch.dataprepper.plugins.sink.opensearch; + +import io.micrometer.core.instrument.Counter; +import org.opensearch.client.opensearch.OpenSearchClient; +import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.model.configuration.PluginSetting; +import org.opensearch.dataprepper.model.plugin.PluginComponentRefresher; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Objects; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.Function; + +public class OpenSearchClientRefresher implements PluginComponentRefresher { + static final String CREDENTIALS_CHANGED = "credentialsChanged"; + static final String CLIENT_REFRESH_ERRORS = "clientRefreshErrors"; + private static final Logger LOG = LoggerFactory.getLogger(OpenSearchClientRefresher.class); + private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); + private final Function clientFunction; + private OpenSearchClient currentClient; + private ConnectionConfiguration currentConfig; + + private final Counter credentialsChangeCounter; + private final Counter clientRefreshErrorsCounter; + + public OpenSearchClientRefresher(final PluginMetrics pluginMetrics, + final ConnectionConfiguration connectionConfiguration, + final Function clientFunction) { + this.clientFunction = clientFunction; + this.currentConfig = connectionConfiguration; + this.currentClient = clientFunction.apply(connectionConfiguration); + credentialsChangeCounter = pluginMetrics.counter(CREDENTIALS_CHANGED); + clientRefreshErrorsCounter = pluginMetrics.counter(CLIENT_REFRESH_ERRORS); + } + + @Override + public Class getComponentClass() { + return OpenSearchClient.class; + } + + @Override + public OpenSearchClient get() { + readWriteLock.readLock().lock(); + try { + return currentClient; + } finally { + readWriteLock.readLock().unlock(); + } + } + + @Override + public void update(PluginSetting pluginSetting) { + final ConnectionConfiguration newConfig = ConnectionConfiguration.readConnectionConfiguration(pluginSetting); + if (basicAuthChanged(newConfig)) { + credentialsChangeCounter.increment(); + readWriteLock.writeLock().lock(); + try { + currentClient = clientFunction.apply(newConfig); + currentConfig = newConfig; + } catch (Exception e) { + clientRefreshErrorsCounter.increment(); + LOG.error("Refreshing {} failed.", getComponentClass(), e); + } finally { + readWriteLock.writeLock().unlock(); + } + } + } + + private boolean basicAuthChanged(final ConnectionConfiguration newConfig) { + return !Objects.equals(currentConfig.getUsername(), newConfig.getUsername()) || + !Objects.equals(currentConfig.getPassword(), newConfig.getPassword()); + } +} diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java index 8f20daa561..e1547a925c 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java @@ -35,6 +35,7 @@ import org.opensearch.dataprepper.model.failures.DlqObject; import org.opensearch.dataprepper.model.opensearch.OpenSearchBulkActions; import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException; +import org.opensearch.dataprepper.model.plugin.PluginConfigObservable; import org.opensearch.dataprepper.model.plugin.PluginFactory; import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.model.sink.AbstractSink; @@ -81,6 +82,7 @@ import java.util.StringJoiner; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Function; import java.util.function.Supplier; import java.util.stream.Collectors; @@ -128,6 +130,7 @@ public class OpenSearchSink extends AbstractSink> { private final DistributionSummary bulkRequestSizeBytesSummary; private final Counter dynamicDocumentVersionDroppedEvents; private OpenSearchClient openSearchClient; + private OpenSearchClientRefresher openSearchClientRefresher; private ObjectMapper objectMapper; private volatile boolean initialized; private PluginSetting pluginSetting; @@ -139,13 +142,15 @@ public class OpenSearchSink extends AbstractSink> { private DlqProvider dlqProvider; private final ConcurrentHashMap> bulkRequestMap; private final ConcurrentHashMap lastFlushTimeMap; + private final PluginConfigObservable pluginConfigObservable; @DataPrepperPluginConstructor public OpenSearchSink(final PluginSetting pluginSetting, final PluginFactory pluginFactory, final SinkContext sinkContext, final ExpressionEvaluator expressionEvaluator, - final AwsCredentialsSupplier awsCredentialsSupplier) { + final AwsCredentialsSupplier awsCredentialsSupplier, + final PluginConfigObservable pluginConfigObservable) { super(pluginSetting, Integer.MAX_VALUE, INITIALIZE_RETRY_WAIT_TIME_MS); this.awsCredentialsSupplier = awsCredentialsSupplier; this.sinkContext = sinkContext != null ? sinkContext : new SinkContext(null, Collections.emptyList(), Collections.emptyList(), Collections.emptyList()); @@ -178,6 +183,7 @@ public OpenSearchSink(final PluginSetting pluginSetting, this.pluginSetting = pluginSetting; this.bulkRequestMap = new ConcurrentHashMap<>(); this.lastFlushTimeMap = new ConcurrentHashMap<>(); + this.pluginConfigObservable = pluginConfigObservable; final Optional dlqConfig = openSearchSinkConfig.getRetryConfiguration().getDlq(); if (dlqConfig.isPresent()) { @@ -210,8 +216,21 @@ public void doInitialize() { private void doInitializeInternal() throws IOException { LOG.info("Initializing OpenSearch sink"); - restHighLevelClient = openSearchSinkConfig.getConnectionConfiguration().createClient(awsCredentialsSupplier); - openSearchClient = openSearchSinkConfig.getConnectionConfiguration().createOpenSearchClient(restHighLevelClient, awsCredentialsSupplier); + final ConnectionConfiguration connectionConfiguration = openSearchSinkConfig.getConnectionConfiguration(); + restHighLevelClient = connectionConfiguration.createClient(awsCredentialsSupplier); + openSearchClient = connectionConfiguration.createOpenSearchClient(restHighLevelClient, awsCredentialsSupplier); + final Function clientFunction = + (connectionConfiguration1) -> { + final RestHighLevelClient restHighLevelClient1 = connectionConfiguration1.createClient(awsCredentialsSupplier); + return connectionConfiguration1.createOpenSearchClient(restHighLevelClient1, awsCredentialsSupplier).withTransportOptions( + TransportOptions.builder() + .setParameter("filter_path", "errors,took,items.*.error,items.*.status,items.*._index,items.*._id") + .build()); + }; + openSearchClientRefresher = new OpenSearchClientRefresher( + pluginMetrics, connectionConfiguration, clientFunction); + pluginConfigObservable.addPluginConfigObserver( + newPluginSetting -> openSearchClientRefresher.update((PluginSetting) newPluginSetting)); configuredIndexAlias = openSearchSinkConfig.getIndexConfiguration().getIndexAlias(); final IndexTemplateAPIWrapper indexTemplateAPIWrapper = IndexTemplateAPIWrapperFactory.getWrapper( openSearchSinkConfig.getIndexConfiguration(), openSearchClient); @@ -249,11 +268,8 @@ private void doInitializeInternal() throws IOException { } final int maxRetries = openSearchSinkConfig.getRetryConfiguration().getMaxRetries(); - final OpenSearchClient filteringOpenSearchClient = openSearchClient.withTransportOptions( - TransportOptions.builder() - .setParameter("filter_path", "errors,took,items.*.error,items.*.status,items.*._index,items.*._id") - .build()); - bulkApiWrapper = BulkApiWrapperFactory.getWrapper(openSearchSinkConfig.getIndexConfiguration(), filteringOpenSearchClient); + bulkApiWrapper = BulkApiWrapperFactory.getWrapper(openSearchSinkConfig.getIndexConfiguration(), + () -> openSearchClientRefresher.get()); bulkRetryStrategy = new BulkRetryStrategy(bulkRequest -> bulkApiWrapper.bulk(bulkRequest.getRequest()), this::logFailureForBulkRequests, pluginMetrics, diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/bulk/BulkApiWrapperFactory.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/bulk/BulkApiWrapperFactory.java index 2c7874d8db..e1f943e4b7 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/bulk/BulkApiWrapperFactory.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/bulk/BulkApiWrapperFactory.java @@ -4,13 +4,15 @@ import org.opensearch.dataprepper.plugins.sink.opensearch.DistributionVersion; import org.opensearch.dataprepper.plugins.sink.opensearch.index.IndexConfiguration; +import java.util.function.Supplier; + public class BulkApiWrapperFactory { public static BulkApiWrapper getWrapper(final IndexConfiguration indexConfiguration, - final OpenSearchClient openSearchClient) { + final Supplier openSearchClientSupplier) { if (DistributionVersion.ES6.equals(indexConfiguration.getDistributionVersion())) { - return new Es6BulkApiWrapper(openSearchClient); + return new Es6BulkApiWrapper(openSearchClientSupplier); } else { - return new OpenSearchDefaultBulkApiWrapper(openSearchClient); + return new OpenSearchDefaultBulkApiWrapper(openSearchClientSupplier); } } } diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/bulk/Es6BulkApiWrapper.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/bulk/Es6BulkApiWrapper.java index 6215d48582..801522419a 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/bulk/Es6BulkApiWrapper.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/bulk/Es6BulkApiWrapper.java @@ -14,18 +14,20 @@ import java.io.IOException; import java.util.HashMap; import java.util.Map; +import java.util.function.Supplier; import java.util.stream.Collectors; public class Es6BulkApiWrapper implements BulkApiWrapper { - private final OpenSearchClient openSearchClient; + private final Supplier openSearchClientSupplier; - public Es6BulkApiWrapper(final OpenSearchClient openSearchClient) { - this.openSearchClient = openSearchClient; + public Es6BulkApiWrapper(final Supplier openSearchClientSupplier) { + this.openSearchClientSupplier = openSearchClientSupplier; } @Override public BulkResponse bulk(BulkRequest request) throws IOException, OpenSearchException { final JsonEndpoint endpoint = es6BulkEndpoint(request); + final OpenSearchClient openSearchClient = openSearchClientSupplier.get(); return openSearchClient._transport().performRequest(request, endpoint, openSearchClient._transportOptions()); } diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/bulk/OpenSearchDefaultBulkApiWrapper.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/bulk/OpenSearchDefaultBulkApiWrapper.java index d6b27557bf..408dd77d85 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/bulk/OpenSearchDefaultBulkApiWrapper.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/bulk/OpenSearchDefaultBulkApiWrapper.java @@ -5,16 +5,17 @@ import org.opensearch.client.opensearch.core.BulkResponse; import java.io.IOException; +import java.util.function.Supplier; public class OpenSearchDefaultBulkApiWrapper implements BulkApiWrapper { - private final OpenSearchClient openSearchClient; + private final Supplier openSearchClientSupplier; - public OpenSearchDefaultBulkApiWrapper(final OpenSearchClient openSearchClient) { - this.openSearchClient = openSearchClient; + public OpenSearchDefaultBulkApiWrapper(final Supplier openSearchClientSupplier) { + this.openSearchClientSupplier = openSearchClientSupplier; } @Override public BulkResponse bulk(BulkRequest request) throws IOException { - return openSearchClient.bulk(request); + return openSearchClientSupplier.get().bulk(request); } } diff --git a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchClientRefresherTest.java b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchClientRefresherTest.java new file mode 100644 index 0000000000..81f6a21222 --- /dev/null +++ b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchClientRefresherTest.java @@ -0,0 +1,161 @@ +package org.opensearch.dataprepper.plugins.sink.opensearch; + +import io.micrometer.core.instrument.Counter; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.MockedStatic; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.client.opensearch.OpenSearchClient; +import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.model.configuration.PluginSetting; + +import java.util.function.Function; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.mockStatic; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; +import static org.opensearch.dataprepper.plugins.sink.opensearch.OpenSearchClientRefresher.CLIENT_REFRESH_ERRORS; +import static org.opensearch.dataprepper.plugins.sink.opensearch.OpenSearchClientRefresher.CREDENTIALS_CHANGED; + +@ExtendWith(MockitoExtension.class) +class OpenSearchClientRefresherTest { + private static final String TEST_USERNAME = "test_user"; + private static final String TEST_PASSWORD = "test_password"; + + @Mock + private Function clientFunction; + + @Mock + private ConnectionConfiguration connectionConfiguration; + + @Mock + private OpenSearchClient openSearchClient; + + @Mock + private PluginMetrics pluginMetrics; + + @Mock + private Counter credentialsChangeCounter; + + @Mock + private Counter clientRefreshErrorsCounter; + + private OpenSearchClientRefresher createObjectUnderTest() { + return new OpenSearchClientRefresher(pluginMetrics, connectionConfiguration, clientFunction); + } + + @BeforeEach + void setUp() { + when(clientFunction.apply(eq(connectionConfiguration))).thenReturn(openSearchClient); + } + + @Test + void testGet() { + final OpenSearchClientRefresher objectUnderTest = createObjectUnderTest(); + assertThat(objectUnderTest.get(), equalTo(openSearchClient)); + } + + @Test + void testGetAfterUpdateWithBasicAuthUnchanged() { + final OpenSearchClientRefresher objectUnderTest = createObjectUnderTest(); + verify(clientFunction, times(1)).apply(any()); + when(connectionConfiguration.getUsername()).thenReturn(TEST_USERNAME); + when(connectionConfiguration.getPassword()).thenReturn(TEST_PASSWORD); + final PluginSetting newConfig = mock(PluginSetting.class); + final ConnectionConfiguration newConnectionConfiguration = mock(ConnectionConfiguration.class); + when(newConnectionConfiguration.getUsername()).thenReturn(TEST_USERNAME); + when(newConnectionConfiguration.getPassword()).thenReturn(TEST_PASSWORD); + try (MockedStatic configurationMockedStatic = mockStatic( + ConnectionConfiguration.class)) { + configurationMockedStatic.when(() -> ConnectionConfiguration.readConnectionConfiguration(eq(newConfig))) + .thenReturn(newConnectionConfiguration); + objectUnderTest.update(newConfig); + } + assertThat(objectUnderTest.get(), equalTo(openSearchClient)); + verifyNoMoreInteractions(clientFunction); + } + + @Test + void testGetAfterUpdateWithUsernameChanged() { + when(pluginMetrics.counter(CREDENTIALS_CHANGED)).thenReturn(credentialsChangeCounter); + final OpenSearchClientRefresher objectUnderTest = createObjectUnderTest(); + verify(clientFunction, times(1)).apply(any()); + assertThat(objectUnderTest.get(), equalTo(openSearchClient)); + when(connectionConfiguration.getUsername()).thenReturn(TEST_USERNAME); + final PluginSetting newConfig = mock(PluginSetting.class); + final ConnectionConfiguration newConnectionConfiguration = mock(ConnectionConfiguration.class); + when(newConnectionConfiguration.getUsername()).thenReturn(TEST_USERNAME + "_changed"); + final OpenSearchClient newClient = mock(OpenSearchClient.class); + when(clientFunction.apply(eq(newConnectionConfiguration))).thenReturn(newClient); + try (MockedStatic configurationMockedStatic = mockStatic( + ConnectionConfiguration.class)) { + configurationMockedStatic.when(() -> ConnectionConfiguration.readConnectionConfiguration(eq(newConfig))) + .thenReturn(newConnectionConfiguration); + objectUnderTest.update(newConfig); + } + assertThat(objectUnderTest.get(), equalTo(newClient)); + verify(credentialsChangeCounter).increment(); + verify(clientFunction, times(2)).apply(any()); + } + + @Test + void testGetAfterUpdateWithPasswordChanged() { + when(pluginMetrics.counter(CREDENTIALS_CHANGED)).thenReturn(credentialsChangeCounter); + final OpenSearchClientRefresher objectUnderTest = createObjectUnderTest(); + verify(clientFunction, times(1)).apply(any()); + assertThat(objectUnderTest.get(), equalTo(openSearchClient)); + when(connectionConfiguration.getUsername()).thenReturn(TEST_USERNAME); + when(connectionConfiguration.getPassword()).thenReturn(TEST_PASSWORD); + final PluginSetting newConfig = mock(PluginSetting.class); + final ConnectionConfiguration newConnectionConfiguration = mock(ConnectionConfiguration.class); + when(newConnectionConfiguration.getUsername()).thenReturn(TEST_USERNAME); + when(newConnectionConfiguration.getPassword()).thenReturn(TEST_PASSWORD + "_changed"); + final OpenSearchClient newClient = mock(OpenSearchClient.class); + when(clientFunction.apply(eq(newConnectionConfiguration))).thenReturn(newClient); + try (MockedStatic configurationMockedStatic = mockStatic( + ConnectionConfiguration.class)) { + configurationMockedStatic.when(() -> ConnectionConfiguration.readConnectionConfiguration(eq(newConfig))) + .thenReturn(newConnectionConfiguration); + objectUnderTest.update(newConfig); + } + assertThat(objectUnderTest.get(), equalTo(newClient)); + verify(credentialsChangeCounter).increment(); + verify(clientFunction, times(2)).apply(any()); + } + + @Test + void testGetAfterUpdateClientFailure() { + when(pluginMetrics.counter(CREDENTIALS_CHANGED)).thenReturn(credentialsChangeCounter); + when(pluginMetrics.counter(CLIENT_REFRESH_ERRORS)).thenReturn(clientRefreshErrorsCounter); + final OpenSearchClientRefresher objectUnderTest = createObjectUnderTest(); + verify(clientFunction, times(1)).apply(any()); + assertThat(objectUnderTest.get(), equalTo(openSearchClient)); + when(connectionConfiguration.getUsername()).thenReturn(TEST_USERNAME); + when(connectionConfiguration.getPassword()).thenReturn(TEST_PASSWORD); + final PluginSetting newConfig = mock(PluginSetting.class); + final ConnectionConfiguration newConnectionConfiguration = mock(ConnectionConfiguration.class); + when(newConnectionConfiguration.getUsername()).thenReturn(TEST_USERNAME); + when(newConnectionConfiguration.getPassword()).thenReturn(TEST_PASSWORD + "_changed"); + final OpenSearchClient newClient = mock(OpenSearchClient.class); + when(clientFunction.apply(eq(newConnectionConfiguration))).thenThrow(RuntimeException.class); + try (MockedStatic configurationMockedStatic = mockStatic( + ConnectionConfiguration.class)) { + configurationMockedStatic.when(() -> ConnectionConfiguration.readConnectionConfiguration(eq(newConfig))) + .thenReturn(newConnectionConfiguration); + objectUnderTest.update(newConfig); + } + assertThat(objectUnderTest.get(), equalTo(openSearchClient)); + verify(credentialsChangeCounter).increment(); + verify(clientRefreshErrorsCounter).increment(); + verify(clientFunction, times(2)).apply(any()); + } +} \ No newline at end of file diff --git a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkTest.java b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkTest.java index 05996e936e..9b51954b62 100644 --- a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkTest.java +++ b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkTest.java @@ -27,6 +27,7 @@ import org.opensearch.dataprepper.model.event.EventHandle; import org.opensearch.dataprepper.model.event.JacksonEvent; import org.opensearch.dataprepper.model.failures.DlqObject; +import org.opensearch.dataprepper.model.plugin.PluginConfigObservable; import org.opensearch.dataprepper.model.plugin.PluginFactory; import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.model.sink.SinkContext; @@ -123,6 +124,9 @@ public class OpenSearchSinkTest { @Mock private Counter dynamicDocumentVersionDroppedEvents; + @Mock + private PluginConfigObservable pluginConfigObservable; + @BeforeEach void setup() { when(pluginSetting.getPipelineName()).thenReturn(UUID.randomUUID().toString()); @@ -167,9 +171,9 @@ void setup() { when(pluginMetrics.counter(INVALID_VERSION_EXPRESSION_DROPPED_EVENTS)).thenReturn(dynamicDocumentVersionDroppedEvents); when(pluginMetrics.summary(BULKREQUEST_SIZE_BYTES)).thenReturn(bulkRequestSizeBytesSummary); - when(sinkContext.getTagsTargetKey()).thenReturn(null); - when(sinkContext.getIncludeKeys()).thenReturn(null); - when(sinkContext.getExcludeKeys()).thenReturn(null); + lenient().when(sinkContext.getTagsTargetKey()).thenReturn(null); + lenient().when(sinkContext.getIncludeKeys()).thenReturn(null); + lenient().when(sinkContext.getExcludeKeys()).thenReturn(null); } private OpenSearchSink createObjectUnderTest() throws IOException { @@ -181,10 +185,22 @@ private OpenSearchSink createObjectUnderTest() throws IOException { pluginMetricsMockedStatic.when(() -> PluginMetrics.fromPluginSetting(pluginSetting)).thenReturn(pluginMetrics); openSearchSinkConfigurationMockedStatic.when(() -> OpenSearchSinkConfiguration.readESConfig(pluginSetting, expressionEvaluator)) .thenReturn(openSearchSinkConfiguration); - return new OpenSearchSink(pluginSetting, pluginFactory, sinkContext, expressionEvaluator, awsCredentialsSupplier); + return new OpenSearchSink( + pluginSetting, pluginFactory, sinkContext, expressionEvaluator, awsCredentialsSupplier, pluginConfigObservable); } } + @Test + void test_initialization() throws IOException { + final OpenSearchSink objectUnderTest = createObjectUnderTest(); + when(indexManagerFactory.getIndexManager(any(IndexType.class), eq(openSearchClient), any(RestHighLevelClient.class), eq(openSearchSinkConfiguration), any(TemplateStrategy.class), any())) + .thenReturn(indexManager); + doNothing().when(indexManager).setupIndex(); + objectUnderTest.initialize(); + + verify(pluginConfigObservable).addPluginConfigObserver(any()); + } + @Test void doOutput_with_invalid_version_expression_catches_NumberFormatException_and_creates_DLQObject() throws IOException { diff --git a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/bulk/BulkApiWrapperFactoryTest.java b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/bulk/BulkApiWrapperFactoryTest.java index c8884227be..a65610386b 100644 --- a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/bulk/BulkApiWrapperFactoryTest.java +++ b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/bulk/BulkApiWrapperFactoryTest.java @@ -23,14 +23,14 @@ class BulkApiWrapperFactoryTest { @Test void testGetEs6BulkApiWrapper() { when(indexConfiguration.getDistributionVersion()).thenReturn(DistributionVersion.ES6); - assertThat(BulkApiWrapperFactory.getWrapper(indexConfiguration, openSearchClient), + assertThat(BulkApiWrapperFactory.getWrapper(indexConfiguration, () -> openSearchClient), instanceOf(Es6BulkApiWrapper.class)); } @Test void testGetOpenSearchDefaultBulkApiWrapper() { when(indexConfiguration.getDistributionVersion()).thenReturn(DistributionVersion.DEFAULT); - assertThat(BulkApiWrapperFactory.getWrapper(indexConfiguration, openSearchClient), + assertThat(BulkApiWrapperFactory.getWrapper(indexConfiguration, () -> openSearchClient), instanceOf(OpenSearchDefaultBulkApiWrapper.class)); } } \ No newline at end of file diff --git a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/bulk/Es6BulkApiWrapperTest.java b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/bulk/Es6BulkApiWrapperTest.java index 9bce8a8ac1..558671f091 100644 --- a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/bulk/Es6BulkApiWrapperTest.java +++ b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/bulk/Es6BulkApiWrapperTest.java @@ -75,7 +75,7 @@ class Es6BulkApiWrapperTest { @BeforeEach void setUp() { - objectUnderTest = new Es6BulkApiWrapper(openSearchClient); + objectUnderTest = new Es6BulkApiWrapper(() -> openSearchClient); testIndex = RandomStringUtils.randomAlphabetic(5); lenient().when(bulkOperation.index()).thenReturn(indexOperation); lenient().when(bulkOperation.create()).thenReturn(createOperation); diff --git a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/bulk/OpenSearchDefaultBulkApiWrapperTest.java b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/bulk/OpenSearchDefaultBulkApiWrapperTest.java index 925dee67b3..598ea53d68 100644 --- a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/bulk/OpenSearchDefaultBulkApiWrapperTest.java +++ b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/bulk/OpenSearchDefaultBulkApiWrapperTest.java @@ -24,7 +24,7 @@ class OpenSearchDefaultBulkApiWrapperTest { @BeforeEach void setUp() { - objectUnderTest = new OpenSearchDefaultBulkApiWrapper(openSearchClient); + objectUnderTest = new OpenSearchDefaultBulkApiWrapper(() -> openSearchClient); } @Test