Skip to content

Commit

Permalink
Add e2e acknowledgments support to opensearch source (opensearch-proj…
Browse files Browse the repository at this point in the history
…ect#3278)

Signed-off-by: Taylor Gray <tylgry@amazon.com>
  • Loading branch information
graytaylor0 authored Aug 31, 2023
1 parent efe5834 commit ff6a915
Show file tree
Hide file tree
Showing 12 changed files with 481 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package org.opensearch.dataprepper.plugins.source.opensearch;

import org.opensearch.dataprepper.buffer.common.BufferAccumulator;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.record.Record;
Expand Down Expand Up @@ -40,25 +41,29 @@ public class OpenSearchService {
private final OpenSearchIndexPartitionCreationSupplier openSearchIndexPartitionCreationSupplier;
private final ScheduledExecutorService scheduledExecutorService;
private final BufferAccumulator<Record<Event>> bufferAccumulator;
private final AcknowledgementSetManager acknowledgementSetManager;

private SearchWorker searchWorker;
private ScheduledFuture<?> searchWorkerFuture;

public static OpenSearchService createOpenSearchService(final SearchAccessor searchAccessor,
final SourceCoordinator<OpenSearchIndexProgressState> sourceCoordinator,
final OpenSearchSourceConfiguration openSearchSourceConfiguration,
final Buffer<Record<Event>> buffer) {
final Buffer<Record<Event>> buffer,
final AcknowledgementSetManager acknowledgementSetManager) {
return new OpenSearchService(
searchAccessor, sourceCoordinator, openSearchSourceConfiguration, buffer, Executors.newSingleThreadScheduledExecutor(),
BufferAccumulator.create(buffer, openSearchSourceConfiguration.getSearchConfiguration().getBatchSize(), BUFFER_TIMEOUT));
BufferAccumulator.create(buffer, openSearchSourceConfiguration.getSearchConfiguration().getBatchSize(), BUFFER_TIMEOUT),
acknowledgementSetManager);
}

private OpenSearchService(final SearchAccessor searchAccessor,
final SourceCoordinator<OpenSearchIndexProgressState> sourceCoordinator,
final OpenSearchSourceConfiguration openSearchSourceConfiguration,
final Buffer<Record<Event>> buffer,
final ScheduledExecutorService scheduledExecutorService,
final BufferAccumulator<Record<Event>> bufferAccumulator) {
final BufferAccumulator<Record<Event>> bufferAccumulator,
final AcknowledgementSetManager acknowledgementSetManager) {
this.searchAccessor = searchAccessor;
this.openSearchSourceConfiguration = openSearchSourceConfiguration;
this.buffer = buffer;
Expand All @@ -67,18 +72,19 @@ private OpenSearchService(final SearchAccessor searchAccessor,
this.openSearchIndexPartitionCreationSupplier = new OpenSearchIndexPartitionCreationSupplier(openSearchSourceConfiguration, (ClusterClientFactory) searchAccessor);
this.scheduledExecutorService = scheduledExecutorService;
this.bufferAccumulator = bufferAccumulator;
this.acknowledgementSetManager = acknowledgementSetManager;
}

public void start() {
switch(searchAccessor.getSearchContextType()) {
case POINT_IN_TIME:
searchWorker = new PitWorker(searchAccessor, openSearchSourceConfiguration, sourceCoordinator, bufferAccumulator, openSearchIndexPartitionCreationSupplier);
searchWorker = new PitWorker(searchAccessor, openSearchSourceConfiguration, sourceCoordinator, bufferAccumulator, openSearchIndexPartitionCreationSupplier, acknowledgementSetManager);
break;
case SCROLL:
searchWorker = new ScrollWorker(searchAccessor, openSearchSourceConfiguration, sourceCoordinator, bufferAccumulator, openSearchIndexPartitionCreationSupplier);
searchWorker = new ScrollWorker(searchAccessor, openSearchSourceConfiguration, sourceCoordinator, bufferAccumulator, openSearchIndexPartitionCreationSupplier, acknowledgementSetManager);
break;
case NONE:
searchWorker = new NoSearchContextWorker(searchAccessor, openSearchSourceConfiguration, sourceCoordinator, bufferAccumulator, openSearchIndexPartitionCreationSupplier);
searchWorker = new NoSearchContextWorker(searchAccessor, openSearchSourceConfiguration, sourceCoordinator, bufferAccumulator, openSearchIndexPartitionCreationSupplier, acknowledgementSetManager);
break;
default:
throw new IllegalArgumentException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
*/
package org.opensearch.dataprepper.plugins.source.opensearch;

import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
import org.opensearch.dataprepper.model.buffer.Buffer;
Expand All @@ -22,15 +23,18 @@ public class OpenSearchSource implements Source<Record<Event>>, UsesSourceCoordi

private final AwsCredentialsSupplier awsCredentialsSupplier;
private final OpenSearchSourceConfiguration openSearchSourceConfiguration;
private final AcknowledgementSetManager acknowledgementSetManager;

private SourceCoordinator<OpenSearchIndexProgressState> sourceCoordinator;
private OpenSearchService openSearchService;

@DataPrepperPluginConstructor
public OpenSearchSource(final OpenSearchSourceConfiguration openSearchSourceConfiguration,
final AwsCredentialsSupplier awsCredentialsSupplier) {
final AwsCredentialsSupplier awsCredentialsSupplier,
final AcknowledgementSetManager acknowledgementSetManager) {
this.openSearchSourceConfiguration = openSearchSourceConfiguration;
this.awsCredentialsSupplier = awsCredentialsSupplier;
this.acknowledgementSetManager = acknowledgementSetManager;

openSearchSourceConfiguration.validateAwsConfigWithUsernameAndPassword();
}
Expand All @@ -50,10 +54,15 @@ private void startProcess(final OpenSearchSourceConfiguration openSearchSourceCo

final SearchAccessor searchAccessor = searchAccessorStrategy.getSearchAccessor();

openSearchService = OpenSearchService.createOpenSearchService(searchAccessor, sourceCoordinator, openSearchSourceConfiguration, buffer);
openSearchService = OpenSearchService.createOpenSearchService(searchAccessor, sourceCoordinator, openSearchSourceConfiguration, buffer, acknowledgementSetManager);
openSearchService.start();
}

@Override
public boolean areAcknowledgementsEnabled() {
return openSearchSourceConfiguration.isAcknowledgmentsEnabled();
}

@Override
public void stop() {
openSearchService.stop();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ public class OpenSearchSourceConfiguration {
@JsonProperty("disable_authentication")
private Boolean disableAuthentication = false;

@JsonProperty("acknowledgments")
private Boolean acknowledgments = false;

@JsonProperty("connection")
@Valid
private ConnectionConfiguration connectionConfiguration = new ConnectionConfiguration();
Expand Down Expand Up @@ -66,6 +69,10 @@ public String getPassword() {

public Boolean isAuthenticationDisabled() { return disableAuthentication; }

public Boolean isAcknowledgmentsEnabled() {
return acknowledgments;
}

public ConnectionConfiguration getConnectionConfiguration() {
return connectionConfiguration;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@

package org.opensearch.dataprepper.plugins.source.opensearch.worker;

import org.apache.commons.lang3.tuple.Pair;
import org.opensearch.dataprepper.buffer.common.BufferAccumulator;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.source.coordinator.SourceCoordinator;
Expand All @@ -26,7 +29,10 @@
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;

import static org.opensearch.dataprepper.plugins.source.opensearch.worker.WorkerCommonUtils.completeIndexPartition;
import static org.opensearch.dataprepper.plugins.source.opensearch.worker.WorkerCommonUtils.createAcknowledgmentSet;
import static org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.MetadataKeyAttributes.DOCUMENT_ID_METADATA_ATTRIBUTE_NAME;
import static org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.MetadataKeyAttributes.INDEX_METADATA_ATTRIBUTE_NAME;

Expand All @@ -41,17 +47,20 @@ public class NoSearchContextWorker implements SearchWorker, Runnable {
private final SourceCoordinator<OpenSearchIndexProgressState> sourceCoordinator;
private final BufferAccumulator<Record<Event>> bufferAccumulator;
private final OpenSearchIndexPartitionCreationSupplier openSearchIndexPartitionCreationSupplier;
private final AcknowledgementSetManager acknowledgementSetManager;

public NoSearchContextWorker(final SearchAccessor searchAccessor,
final OpenSearchSourceConfiguration openSearchSourceConfiguration,
final SourceCoordinator<OpenSearchIndexProgressState> sourceCoordinator,
final BufferAccumulator<Record<Event>> bufferAccumulator,
final OpenSearchIndexPartitionCreationSupplier openSearchIndexPartitionCreationSupplier) {
final OpenSearchIndexPartitionCreationSupplier openSearchIndexPartitionCreationSupplier,
final AcknowledgementSetManager acknowledgementSetManager) {
this.searchAccessor = searchAccessor;
this.sourceCoordinator = sourceCoordinator;
this.openSearchSourceConfiguration = openSearchSourceConfiguration;
this.bufferAccumulator = bufferAccumulator;
this.openSearchIndexPartitionCreationSupplier = openSearchIndexPartitionCreationSupplier;
this.acknowledgementSetManager = acknowledgementSetManager;
}

@Override
Expand All @@ -73,12 +82,17 @@ public void run() {
}

try {
processIndex(indexPartition.get());
final Pair<AcknowledgementSet, CompletableFuture<Boolean>> acknowledgementSet = createAcknowledgmentSet(
acknowledgementSetManager,
openSearchSourceConfiguration,
sourceCoordinator,
indexPartition.get());

processIndex(indexPartition.get(), acknowledgementSet.getLeft());

completeIndexPartition(openSearchSourceConfiguration, acknowledgementSet.getLeft(), acknowledgementSet.getRight(),
indexPartition.get(), sourceCoordinator);

sourceCoordinator.closePartition(
indexPartition.get().getPartitionKey(),
openSearchSourceConfiguration.getSchedulingParameterConfiguration().getRate(),
openSearchSourceConfiguration.getSchedulingParameterConfiguration().getJobCount());
} catch (final PartitionUpdateException | PartitionNotFoundException | PartitionNotOwnedException e) {
LOG.warn("The search_after worker received an exception from the source coordinator. There is a potential for duplicate data for index {}, giving up partition and getting next partition: {}", indexPartition.get().getPartitionKey(), e.getMessage());
sourceCoordinator.giveUpPartitions();
Expand All @@ -101,7 +115,8 @@ public void run() {
}
}

private void processIndex(final SourcePartition<OpenSearchIndexProgressState> openSearchIndexPartition) {
private void processIndex(final SourcePartition<OpenSearchIndexProgressState> openSearchIndexPartition,
final AcknowledgementSet acknowledgementSet) {
final String indexName = openSearchIndexPartition.getPartitionKey();
Optional<OpenSearchIndexProgressState> openSearchIndexProgressStateOptional = openSearchIndexPartition.getPartitionState();

Expand All @@ -124,6 +139,9 @@ private void processIndex(final SourcePartition<OpenSearchIndexProgressState> op

searchWithSearchAfterResults.getDocuments().stream().map(Record::new).forEach(record -> {
try {
if (Objects.nonNull(acknowledgementSet)) {
acknowledgementSet.add(record.getData());
}
bufferAccumulator.add(record);
} catch (Exception e) {
LOG.error("Failed writing OpenSearch documents to buffer. The last document created has document id '{}' from index '{}' : {}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@
*/
package org.opensearch.dataprepper.plugins.source.opensearch.worker;

import org.apache.commons.lang3.tuple.Pair;
import org.opensearch.dataprepper.buffer.common.BufferAccumulator;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.source.coordinator.SourceCoordinator;
Expand All @@ -30,7 +33,10 @@
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;

import static org.opensearch.dataprepper.plugins.source.opensearch.worker.WorkerCommonUtils.completeIndexPartition;
import static org.opensearch.dataprepper.plugins.source.opensearch.worker.WorkerCommonUtils.createAcknowledgmentSet;
import static org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.MetadataKeyAttributes.DOCUMENT_ID_METADATA_ATTRIBUTE_NAME;
import static org.opensearch.dataprepper.plugins.source.opensearch.worker.client.model.MetadataKeyAttributes.INDEX_METADATA_ATTRIBUTE_NAME;

Expand All @@ -56,16 +62,20 @@ public class PitWorker implements SearchWorker, Runnable {
private final BufferAccumulator<Record<Event>> bufferAccumulator;
private final OpenSearchIndexPartitionCreationSupplier openSearchIndexPartitionCreationSupplier;

private final AcknowledgementSetManager acknowledgementSetManager;

public PitWorker(final SearchAccessor searchAccessor,
final OpenSearchSourceConfiguration openSearchSourceConfiguration,
final SourceCoordinator<OpenSearchIndexProgressState> sourceCoordinator,
final BufferAccumulator<Record<Event>> bufferAccumulator,
final OpenSearchIndexPartitionCreationSupplier openSearchIndexPartitionCreationSupplier) {
final OpenSearchIndexPartitionCreationSupplier openSearchIndexPartitionCreationSupplier,
final AcknowledgementSetManager acknowledgementSetManager) {
this.searchAccessor = searchAccessor;
this.sourceCoordinator = sourceCoordinator;
this.openSearchSourceConfiguration = openSearchSourceConfiguration;
this.bufferAccumulator = bufferAccumulator;
this.openSearchIndexPartitionCreationSupplier = openSearchIndexPartitionCreationSupplier;
this.acknowledgementSetManager = acknowledgementSetManager;
}

@Override
Expand All @@ -85,12 +95,17 @@ public void run() {
}

try {
processIndex(indexPartition.get());

sourceCoordinator.closePartition(
indexPartition.get().getPartitionKey(),
openSearchSourceConfiguration.getSchedulingParameterConfiguration().getRate(),
openSearchSourceConfiguration.getSchedulingParameterConfiguration().getJobCount());
final Pair<AcknowledgementSet, CompletableFuture<Boolean>> acknowledgementSet = createAcknowledgmentSet(
acknowledgementSetManager,
openSearchSourceConfiguration,
sourceCoordinator,
indexPartition.get());

processIndex(indexPartition.get(), acknowledgementSet.getLeft());

completeIndexPartition(openSearchSourceConfiguration, acknowledgementSet.getLeft(), acknowledgementSet.getRight(),
indexPartition.get(), sourceCoordinator);
} catch (final PartitionUpdateException | PartitionNotFoundException | PartitionNotOwnedException e) {
LOG.warn("PitWorker received an exception from the source coordinator. There is a potential for duplicate data for index {}, giving up partition and getting next partition: {}", indexPartition.get().getPartitionKey(), e.getMessage());
sourceCoordinator.giveUpPartitions();
Expand Down Expand Up @@ -122,7 +137,8 @@ public void run() {
}
}

private void processIndex(final SourcePartition<OpenSearchIndexProgressState> openSearchIndexPartition) {
private void processIndex(final SourcePartition<OpenSearchIndexProgressState> openSearchIndexPartition,
final AcknowledgementSet acknowledgementSet) {
final String indexName = openSearchIndexPartition.getPartitionKey();
Optional<OpenSearchIndexProgressState> openSearchIndexProgressStateOptional = openSearchIndexPartition.getPartitionState();

Expand Down Expand Up @@ -160,6 +176,9 @@ private void processIndex(final SourcePartition<OpenSearchIndexProgressState> op

searchWithSearchAfterResults.getDocuments().stream().map(Record::new).forEach(record -> {
try {
if (Objects.nonNull(acknowledgementSet)) {
acknowledgementSet.add(record.getData());
}
bufferAccumulator.add(record);
} catch (Exception e) {
LOG.error("Failed writing OpenSearch documents to buffer. The last document created has document id '{}' from index '{}' : {}",
Expand Down
Loading

0 comments on commit ff6a915

Please sign in to comment.