Skip to content

Commit

Permalink
Add DocumentDB/MongoDB source for initial load (opensearch-project#4285)
Browse files Browse the repository at this point in the history
* Add DocumentDB/MongoDB source for initial load

Signed-off-by: Dinu John <86094133+dinujoh@users.noreply.github.com>

* Update unit test parameter

Signed-off-by: Dinu John <86094133+dinujoh@users.noreply.github.com>

---------

Signed-off-by: Dinu John <86094133+dinujoh@users.noreply.github.com>
  • Loading branch information
dinujoh authored Mar 14, 2024
1 parent 7cbfb0a commit 83d735e
Show file tree
Hide file tree
Showing 11 changed files with 683 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public class ExportRecordBufferWriter extends RecordBufferWriter {
private final DistributionSummary bytesReceivedSummary;
private final DistributionSummary bytesProcessedSummary;

public ExportRecordBufferWriter(final BufferAccumulator<Record<Event>> bufferAccumulator,
private ExportRecordBufferWriter(final BufferAccumulator<Record<Event>> bufferAccumulator,
final CollectionConfig collectionConfig,
final RecordConverter recordConverter,
final PluginMetrics pluginMetrics,
Expand All @@ -44,6 +44,14 @@ public ExportRecordBufferWriter(final BufferAccumulator<Record<Event>> bufferAcc
this.exportStartTime = exportStartTime;
}

public static ExportRecordBufferWriter create(final BufferAccumulator<Record<Event>> bufferAccumulator,
final CollectionConfig collectionConfig,
final RecordConverter recordConverter,
final PluginMetrics pluginMetrics,
final long exportStartTime) {
return new ExportRecordBufferWriter(bufferAccumulator, collectionConfig, recordConverter, pluginMetrics, exportStartTime);
}

@Override
public void writeToBuffer(final AcknowledgementSet acknowledgementSet,
final List<String> records) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ private String getAttributeValue(final Map<String, Object> data, final String at
* @param eventCreationTimeMillis Creation timestamp of the event
* @param eventVersionNumber Event version number to handle conflicts
* @param eventName Event name
* @return Jackson document event
*/
public Event convert(final String record,
final long eventCreationTimeMillis,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,27 +11,27 @@ public class DataQueryProgressState {


@JsonProperty("executedQueries")
private int executedQueries;
private long executedQueries;

@JsonProperty("loadedRecords")
private int loadedRecords;
private long loadedRecords;

@JsonProperty("exportStartTime")
private long startTime;

public int getExecutedQueries() {
public long getExecutedQueries() {
return executedQueries;
}

public int getLoadedRecords() {
public long getLoadedRecords() {
return loadedRecords;
}

public void setExecutedQueries(int executedQueries) {
public void setExecutedQueries(long executedQueries) {
this.executedQueries = executedQueries;
}

public void setLoadedRecords(int loadedRecords) {
public void setLoadedRecords(long loadedRecords) {
this.loadedRecords = loadedRecords;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package org.opensearch.dataprepper.plugins.mongo.documentdb;

import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator;
import org.opensearch.dataprepper.plugins.mongo.export.MongoDBExportPartitionSupplier;
import org.opensearch.dataprepper.plugins.mongo.configuration.MongoDBSourceConfig;
import org.opensearch.dataprepper.plugins.mongo.export.ExportScheduler;
import org.opensearch.dataprepper.plugins.mongo.export.ExportWorker;
import org.opensearch.dataprepper.plugins.mongo.leader.LeaderScheduler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class DocumentDBService {
private static final Logger LOG = LoggerFactory.getLogger(DocumentDBService.class);
private final EnhancedSourceCoordinator sourceCoordinator;
private final PluginMetrics pluginMetrics;
private final MongoDBSourceConfig sourceConfig;
private final AcknowledgementSetManager acknowledgementSetManager;
private final ExecutorService executor;
private ExportScheduler exportScheduler;
private ExportWorker exportWorker;
private LeaderScheduler leaderScheduler;
private final MongoDBExportPartitionSupplier mongoDBExportPartitionSupplier;
public DocumentDBService(final EnhancedSourceCoordinator sourceCoordinator,
final MongoDBSourceConfig sourceConfig,
final PluginMetrics pluginMetrics,
final AcknowledgementSetManager acknowledgementSetManager) {
this.sourceCoordinator = sourceCoordinator;
this.pluginMetrics = pluginMetrics;
this.acknowledgementSetManager = acknowledgementSetManager;
this.sourceConfig = sourceConfig;

this.mongoDBExportPartitionSupplier = new MongoDBExportPartitionSupplier(sourceConfig);
executor = Executors.newFixedThreadPool(3);
}

/**
* This service start three long-running threads (scheduler)
* Each thread is responsible for one type of job.
* The data will be guaranteed to be sent to {@link Buffer} in order.
*
* @param buffer Data Prepper Buffer
*/
public void start(Buffer<Record<Event>> buffer) {
this.exportScheduler = new ExportScheduler(sourceCoordinator, mongoDBExportPartitionSupplier, pluginMetrics);
this.exportWorker = new ExportWorker(sourceCoordinator, buffer, pluginMetrics, acknowledgementSetManager, sourceConfig);
this.leaderScheduler = new LeaderScheduler(sourceCoordinator, sourceConfig.getCollections());

executor.submit(leaderScheduler);
executor.submit(exportScheduler);
executor.submit(exportWorker);
}

/**
* Interrupt the running of schedulers.
* Each scheduler must implement logic for gracefully shutdown.
*/
public void shutdown() {
LOG.info("shutdown DocumentDB Service scheduler and worker");
executor.shutdownNow();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package org.opensearch.dataprepper.plugins.mongo.documentdb;

import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.source.Source;
import org.opensearch.dataprepper.model.source.coordinator.SourcePartitionStoreItem;
import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator;
import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourcePartition;
import org.opensearch.dataprepper.model.source.coordinator.enhanced.UsesEnhancedSourceCoordination;
import org.opensearch.dataprepper.plugins.mongo.coordination.PartitionFactory;
import org.opensearch.dataprepper.plugins.mongo.coordination.partition.LeaderPartition;
import org.opensearch.dataprepper.plugins.mongo.configuration.MongoDBSourceConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Objects;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Function;


@DataPrepperPlugin(name = "documentdb", pluginType = Source.class, pluginConfigurationType = MongoDBSourceConfig.class)

public class DocumentDBSource implements Source<Record<Event>>, UsesEnhancedSourceCoordination {
private static final Logger LOG = LoggerFactory.getLogger(DocumentDBSource.class);

private final PluginMetrics pluginMetrics;
private final MongoDBSourceConfig sourceConfig;
private final ExecutorService executor;
private EnhancedSourceCoordinator sourceCoordinator;
private final AcknowledgementSetManager acknowledgementSetManager;
private DocumentDBService documentDBService;

@DataPrepperPluginConstructor
public DocumentDBSource(final PluginMetrics pluginMetrics,
final MongoDBSourceConfig sourceConfig,
final AcknowledgementSetManager acknowledgementSetManager) {
this.pluginMetrics = pluginMetrics;
this.sourceConfig = sourceConfig;
executor = Executors.newFixedThreadPool(2);
this.acknowledgementSetManager = acknowledgementSetManager;
}

@Override
public void start(final Buffer<Record<Event>> buffer) {
Objects.requireNonNull(sourceCoordinator);
sourceCoordinator.createPartition(new LeaderPartition());

documentDBService = new DocumentDBService(sourceCoordinator, sourceConfig, pluginMetrics,
acknowledgementSetManager);

LOG.info("Start DocumentDB service");
documentDBService.start(buffer);
}


@Override
public void stop() {
LOG.info("Stop DocumentDB service");
if (Objects.nonNull(documentDBService)) {
documentDBService.shutdown();
}
}

@Override
public void setEnhancedSourceCoordinator(EnhancedSourceCoordinator sourceCoordinator) {
this.sourceCoordinator = sourceCoordinator;
this.sourceCoordinator.initialize();
}

@Override
public Function<SourcePartitionStoreItem, EnhancedSourcePartition> getPartitionFactory() {
return new PartitionFactory();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public void run() {

final List<PartitionIdentifier> partitionIdentifiers = mongoDBExportPartitionSupplier.apply(exportPartition);

createDataQueryPartitions(exportPartition.getPartitionKey(), Instant.now(), partitionIdentifiers);
createDataQueryPartitions(exportPartition.getCollection(), Instant.now(), partitionIdentifiers);
}
try {
Thread.sleep(DEFAULT_TAKE_LEASE_INTERVAL_MILLIS);
Expand Down
Loading

0 comments on commit 83d735e

Please sign in to comment.