Skip to content

Commit

Permalink
Add metrics for the opensearch source (#3304)
Browse files Browse the repository at this point in the history
Add metrics for the opensearch source

Signed-off-by: Taylor Gray <tylgry@amazon.com>
  • Loading branch information
graytaylor0 authored Sep 6, 2023
1 parent a3c0b07 commit eff31fe
Show file tree
Hide file tree
Showing 12 changed files with 297 additions and 27 deletions.
1 change: 1 addition & 0 deletions data-prepper-plugins/opensearch-source/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ dependencies {
implementation project(':data-prepper-plugins:buffer-common')
implementation project(':data-prepper-plugins:aws-plugin-api')
implementation 'software.amazon.awssdk:apache-client'
implementation 'io.micrometer:micrometer-core'
implementation 'com.fasterxml.jackson.core:jackson-databind'
implementation 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310:2.15.2'
implementation 'software.amazon.awssdk:s3'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.source.coordinator.SourceCoordinator;
import org.opensearch.dataprepper.plugins.source.opensearch.metrics.OpenSearchSourcePluginMetrics;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.NoSearchContextWorker;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.OpenSearchIndexPartitionCreationSupplier;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.PitWorker;
Expand Down Expand Up @@ -42,6 +43,7 @@ public class OpenSearchService {
private final ScheduledExecutorService scheduledExecutorService;
private final BufferAccumulator<Record<Event>> bufferAccumulator;
private final AcknowledgementSetManager acknowledgementSetManager;
private final OpenSearchSourcePluginMetrics openSearchSourcePluginMetrics;

private SearchWorker searchWorker;
private ScheduledFuture<?> searchWorkerFuture;
Expand All @@ -50,11 +52,12 @@ public static OpenSearchService createOpenSearchService(final SearchAccessor sea
final SourceCoordinator<OpenSearchIndexProgressState> sourceCoordinator,
final OpenSearchSourceConfiguration openSearchSourceConfiguration,
final Buffer<Record<Event>> buffer,
final AcknowledgementSetManager acknowledgementSetManager) {
final AcknowledgementSetManager acknowledgementSetManager,
final OpenSearchSourcePluginMetrics openSearchSourcePluginMetrics) {
return new OpenSearchService(
searchAccessor, sourceCoordinator, openSearchSourceConfiguration, buffer, Executors.newSingleThreadScheduledExecutor(),
BufferAccumulator.create(buffer, openSearchSourceConfiguration.getSearchConfiguration().getBatchSize(), BUFFER_TIMEOUT),
acknowledgementSetManager);
acknowledgementSetManager, openSearchSourcePluginMetrics);
}

private OpenSearchService(final SearchAccessor searchAccessor,
Expand All @@ -63,7 +66,8 @@ private OpenSearchService(final SearchAccessor searchAccessor,
final Buffer<Record<Event>> buffer,
final ScheduledExecutorService scheduledExecutorService,
final BufferAccumulator<Record<Event>> bufferAccumulator,
final AcknowledgementSetManager acknowledgementSetManager) {
final AcknowledgementSetManager acknowledgementSetManager,
final OpenSearchSourcePluginMetrics openSearchSourcePluginMetrics) {
this.searchAccessor = searchAccessor;
this.openSearchSourceConfiguration = openSearchSourceConfiguration;
this.buffer = buffer;
Expand All @@ -73,18 +77,19 @@ private OpenSearchService(final SearchAccessor searchAccessor,
this.scheduledExecutorService = scheduledExecutorService;
this.bufferAccumulator = bufferAccumulator;
this.acknowledgementSetManager = acknowledgementSetManager;
this.openSearchSourcePluginMetrics = openSearchSourcePluginMetrics;
}

public void start() {
switch(searchAccessor.getSearchContextType()) {
case POINT_IN_TIME:
searchWorker = new PitWorker(searchAccessor, openSearchSourceConfiguration, sourceCoordinator, bufferAccumulator, openSearchIndexPartitionCreationSupplier, acknowledgementSetManager);
searchWorker = new PitWorker(searchAccessor, openSearchSourceConfiguration, sourceCoordinator, bufferAccumulator, openSearchIndexPartitionCreationSupplier, acknowledgementSetManager, openSearchSourcePluginMetrics);
break;
case SCROLL:
searchWorker = new ScrollWorker(searchAccessor, openSearchSourceConfiguration, sourceCoordinator, bufferAccumulator, openSearchIndexPartitionCreationSupplier, acknowledgementSetManager);
searchWorker = new ScrollWorker(searchAccessor, openSearchSourceConfiguration, sourceCoordinator, bufferAccumulator, openSearchIndexPartitionCreationSupplier, acknowledgementSetManager, openSearchSourcePluginMetrics);
break;
case NONE:
searchWorker = new NoSearchContextWorker(searchAccessor, openSearchSourceConfiguration, sourceCoordinator, bufferAccumulator, openSearchIndexPartitionCreationSupplier, acknowledgementSetManager);
searchWorker = new NoSearchContextWorker(searchAccessor, openSearchSourceConfiguration, sourceCoordinator, bufferAccumulator, openSearchIndexPartitionCreationSupplier, acknowledgementSetManager, openSearchSourcePluginMetrics);
break;
default:
throw new IllegalArgumentException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
*/
package org.opensearch.dataprepper.plugins.source.opensearch;

import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
Expand All @@ -14,6 +15,7 @@
import org.opensearch.dataprepper.model.source.coordinator.SourceCoordinator;
import org.opensearch.dataprepper.model.source.coordinator.UsesSourceCoordination;
import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier;
import org.opensearch.dataprepper.plugins.source.opensearch.metrics.OpenSearchSourcePluginMetrics;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.OpenSearchClientFactory;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.SearchAccessor;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.SearchAccessorStrategy;
Expand All @@ -24,17 +26,20 @@ public class OpenSearchSource implements Source<Record<Event>>, UsesSourceCoordi
private final AwsCredentialsSupplier awsCredentialsSupplier;
private final OpenSearchSourceConfiguration openSearchSourceConfiguration;
private final AcknowledgementSetManager acknowledgementSetManager;
private final PluginMetrics pluginMetrics;

private SourceCoordinator<OpenSearchIndexProgressState> sourceCoordinator;
private OpenSearchService openSearchService;

@DataPrepperPluginConstructor
public OpenSearchSource(final OpenSearchSourceConfiguration openSearchSourceConfiguration,
final AwsCredentialsSupplier awsCredentialsSupplier,
final AcknowledgementSetManager acknowledgementSetManager) {
final AcknowledgementSetManager acknowledgementSetManager,
final PluginMetrics pluginMetrics) {
this.openSearchSourceConfiguration = openSearchSourceConfiguration;
this.awsCredentialsSupplier = awsCredentialsSupplier;
this.acknowledgementSetManager = acknowledgementSetManager;
this.pluginMetrics = pluginMetrics;

openSearchSourceConfiguration.validateAwsConfigWithUsernameAndPassword();
}
Expand All @@ -47,14 +52,16 @@ public void start(final Buffer<Record<Event>> buffer) {
startProcess(openSearchSourceConfiguration, buffer);
}

private void startProcess(final OpenSearchSourceConfiguration openSearchSourceConfiguration, final Buffer<Record<Event>> buffer) {
private void startProcess(final OpenSearchSourceConfiguration openSearchSourceConfiguration,
final Buffer<Record<Event>> buffer) {

final OpenSearchClientFactory openSearchClientFactory = OpenSearchClientFactory.create(awsCredentialsSupplier);
final OpenSearchSourcePluginMetrics openSearchSourcePluginMetrics = OpenSearchSourcePluginMetrics.create(pluginMetrics);
final SearchAccessorStrategy searchAccessorStrategy = SearchAccessorStrategy.create(openSearchSourceConfiguration, openSearchClientFactory);

final SearchAccessor searchAccessor = searchAccessorStrategy.getSearchAccessor();

openSearchService = OpenSearchService.createOpenSearchService(searchAccessor, sourceCoordinator, openSearchSourceConfiguration, buffer, acknowledgementSetManager);
openSearchService = OpenSearchService.createOpenSearchService(searchAccessor, sourceCoordinator, openSearchSourceConfiguration, buffer, acknowledgementSetManager, openSearchSourcePluginMetrics);
openSearchService.start();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.source.opensearch.metrics;

import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Timer;
import org.opensearch.dataprepper.metrics.PluginMetrics;

public class OpenSearchSourcePluginMetrics {

static final String DOCUMENTS_PROCESSED = "documentsProcessed";
static final String INDICES_PROCESSED = "indicesProcessed";
static final String INDEX_PROCESSING_TIME_ELAPSED = "indexProcessingTime";
static final String PROCESSING_ERRORS = "processingErrors";

private final Counter documentsProcessedCounter;
private final Counter indicesProcessedCounter;
private final Counter processingErrorsCounter;
private final Timer indexProcessingTimeTimer;

public static OpenSearchSourcePluginMetrics create(final PluginMetrics pluginMetrics) {
return new OpenSearchSourcePluginMetrics(pluginMetrics);
}

private OpenSearchSourcePluginMetrics(final PluginMetrics pluginMetrics) {
documentsProcessedCounter = pluginMetrics.counter(DOCUMENTS_PROCESSED);
indicesProcessedCounter = pluginMetrics.counter(INDICES_PROCESSED);
processingErrorsCounter = pluginMetrics.counter(PROCESSING_ERRORS);
indexProcessingTimeTimer = pluginMetrics.timer(INDEX_PROCESSING_TIME_ELAPSED);
}

public Counter getDocumentsProcessedCounter() {
return documentsProcessedCounter;
}

public Counter getIndicesProcessedCounter() {
return indicesProcessedCounter;
}

public Counter getProcessingErrorsCounter() {
return processingErrorsCounter;
}

public Timer getIndexProcessingTimeTimer() {
return indexProcessingTimeTimer;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.opensearch.dataprepper.plugins.source.opensearch.OpenSearchIndexProgressState;
import org.opensearch.dataprepper.plugins.source.opensearch.OpenSearchSourceConfiguration;
import org.opensearch.dataprepper.plugins.source.opensearch.configuration.SearchConfiguration;
import org.opensearch.dataprepper.plugins.source.opensearch.metrics.OpenSearchSourcePluginMetrics;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.SearchAccessor;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.exceptions.IndexNotFoundException;
import org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.NoSearchContextSearchRequest;
Expand Down Expand Up @@ -48,19 +49,22 @@ public class NoSearchContextWorker implements SearchWorker, Runnable {
private final BufferAccumulator<Record<Event>> bufferAccumulator;
private final OpenSearchIndexPartitionCreationSupplier openSearchIndexPartitionCreationSupplier;
private final AcknowledgementSetManager acknowledgementSetManager;
private final OpenSearchSourcePluginMetrics openSearchSourcePluginMetrics;

public NoSearchContextWorker(final SearchAccessor searchAccessor,
final OpenSearchSourceConfiguration openSearchSourceConfiguration,
final SourceCoordinator<OpenSearchIndexProgressState> sourceCoordinator,
final BufferAccumulator<Record<Event>> bufferAccumulator,
final OpenSearchIndexPartitionCreationSupplier openSearchIndexPartitionCreationSupplier,
final AcknowledgementSetManager acknowledgementSetManager) {
final AcknowledgementSetManager acknowledgementSetManager,
final OpenSearchSourcePluginMetrics openSearchSourcePluginMetrics) {
this.searchAccessor = searchAccessor;
this.sourceCoordinator = sourceCoordinator;
this.openSearchSourceConfiguration = openSearchSourceConfiguration;
this.bufferAccumulator = bufferAccumulator;
this.openSearchIndexPartitionCreationSupplier = openSearchIndexPartitionCreationSupplier;
this.acknowledgementSetManager = acknowledgementSetManager;
this.openSearchSourcePluginMetrics = openSearchSourcePluginMetrics;
}

@Override
Expand Down Expand Up @@ -88,11 +92,12 @@ public void run() {
sourceCoordinator,
indexPartition.get());

processIndex(indexPartition.get(), acknowledgementSet.getLeft());
openSearchSourcePluginMetrics.getIndexProcessingTimeTimer().record(() -> processIndex(indexPartition.get(), acknowledgementSet.getLeft()));

completeIndexPartition(openSearchSourceConfiguration, acknowledgementSet.getLeft(), acknowledgementSet.getRight(),
indexPartition.get(), sourceCoordinator);

openSearchSourcePluginMetrics.getIndicesProcessedCounter().increment();
} catch (final PartitionUpdateException | PartitionNotFoundException | PartitionNotOwnedException e) {
LOG.warn("The search_after worker received an exception from the source coordinator. There is a potential for duplicate data for index {}, giving up partition and getting next partition: {}", indexPartition.get().getPartitionKey(), e.getMessage());
sourceCoordinator.giveUpPartitions();
Expand All @@ -102,8 +107,10 @@ public void run() {
} catch (final Exception e) {
LOG.error("Unknown exception while processing index '{}', moving on to another index:", indexPartition.get().getPartitionKey(), e);
sourceCoordinator.giveUpPartitions();
openSearchSourcePluginMetrics.getProcessingErrorsCounter().increment();
}
} catch (final Exception e) {
openSearchSourcePluginMetrics.getProcessingErrorsCounter().increment();
LOG.error("Received an exception while trying to get index to process with search_after, backing off and retrying", e);
try {
Thread.sleep(STANDARD_BACKOFF_MILLIS);
Expand Down Expand Up @@ -143,7 +150,9 @@ private void processIndex(final SourcePartition<OpenSearchIndexProgressState> op
acknowledgementSet.add(record.getData());
}
bufferAccumulator.add(record);
openSearchSourcePluginMetrics.getDocumentsProcessedCounter().increment();
} catch (Exception e) {
openSearchSourcePluginMetrics.getProcessingErrorsCounter().increment();
LOG.error("Failed writing OpenSearch documents to buffer. The last document created has document id '{}' from index '{}' : {}",
record.getData().getMetadata().getAttribute(DOCUMENT_ID_METADATA_ATTRIBUTE_NAME),
record.getData().getMetadata().getAttribute(INDEX_METADATA_ATTRIBUTE_NAME), e.getMessage());
Expand All @@ -157,6 +166,7 @@ private void processIndex(final SourcePartition<OpenSearchIndexProgressState> op
try {
bufferAccumulator.flush();
} catch (final Exception e) {
openSearchSourcePluginMetrics.getProcessingErrorsCounter().increment();
LOG.error("Failed writing remaining OpenSearch documents to buffer due to: {}", e.getMessage());
}
}
Expand Down
Loading

0 comments on commit eff31fe

Please sign in to comment.