Skip to content

Commit

Permalink
DocumentDB/MongoDB source initial checkpoint progress and other impro…
Browse files Browse the repository at this point in the history
…vemnts (#4293)

* DocumentDB/MongoDB source initial checkpoint progress and other improvemnts

Signed-off-by: Dinu John <86094133+dinujoh@users.noreply.github.com>

* Undo typo

Signed-off-by: Dinu John <86094133+dinujoh@users.noreply.github.com>

---------

Signed-off-by: Dinu John <86094133+dinujoh@users.noreply.github.com>
  • Loading branch information
dinujoh committed Mar 18, 2024
1 parent f05703e commit 43edcb5
Show file tree
Hide file tree
Showing 13 changed files with 538 additions and 245 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public static MongoClient getMongoClient(final MongoDBSourceConfig sourceConfig)
if (Objects.nonNull(sourceConfig.getTrustStoreFilePath())) {
final File truststoreFilePath = new File(sourceConfig.getTrustStoreFilePath());
settingBuilder.applyToSslSettings(builder -> {
builder.enabled(sourceConfig.getInsecure());
builder.enabled(sourceConfig.getTls());
builder.invalidHostNameAllowed(sourceConfig.getSslInsecureDisableVerification());
builder.context(TrustStoreProvider.createSSLContext(truststoreFilePath.toPath(),
sourceConfig.getTrustStorePassword()));
Expand All @@ -38,11 +38,11 @@ private static String getConnectionString(final MongoDBSourceConfig sourceConfig
final String password = sourceConfig.getCredentialsConfig().getPassword();
final String hostname = sourceConfig.getHostname();
final int port = sourceConfig.getPort();
final String ssl = sourceConfig.getInsecure().toString();
final String tls = sourceConfig.getTls().toString();
final String invalidHostAllowed = sourceConfig.getSslInsecureDisableVerification().toString();
final String readPreference = sourceConfig.getReadPreference();
final String directionConnection = sourceConfig.getDirectConnection().toString();
return String.format(MONGO_CONNECTION_STRING_TEMPLATE, username, password, hostname, port,
readPreference, ssl, invalidHostAllowed, directionConnection);
readPreference, tls, invalidHostAllowed, directionConnection);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.validation.constraints.NotNull;

import java.time.Duration;
import java.util.ArrayList;
import java.util.List;

Expand All @@ -11,7 +12,9 @@ public class MongoDBSourceConfig {
private static final Boolean DEFAULT_INSECURE = false;
private static final Boolean DEFAULT_INSECURE_DISABLE_VERIFICATION = false;
private static final String DEFAULT_SNAPSHOT_FETCH_SIZE = "1000";
private static final String DEFAULT_READ_PREFERENCE = "secondaryPreferred";
private static final String DEFAULT_READ_PREFERENCE = "primaryPreferred";
private static final Boolean DEFAULT_DIRECT_CONNECT = true;
private static final Duration DEFAULT_ACKNOWLEDGEMENT_SET_TIMEOUT = Duration.ofHours(2);
@JsonProperty("hostname")
private @NotNull String hostname;
@JsonProperty("port")
Expand All @@ -31,6 +34,10 @@ public class MongoDBSourceConfig {
private List<CollectionConfig> collections;
@JsonProperty("acknowledgments")
private Boolean acknowledgments = false;

@JsonProperty
private Duration partitionAcknowledgmentTimeout;

@JsonProperty("insecure")
private Boolean insecure;
@JsonProperty("ssl_insecure_disable_verification")
Expand All @@ -44,6 +51,8 @@ public MongoDBSourceConfig() {
this.collections = new ArrayList<>();
this.insecure = DEFAULT_INSECURE;
this.sslInsecureDisableVerification = DEFAULT_INSECURE_DISABLE_VERIFICATION;
this.directConnection = DEFAULT_DIRECT_CONNECT;
this.partitionAcknowledgmentTimeout = DEFAULT_ACKNOWLEDGEMENT_SET_TIMEOUT;
}

public CredentialsConfig getCredentialsConfig() {
Expand All @@ -66,8 +75,8 @@ public String getTrustStorePassword() {
return this.trustStorePassword;
}

public Boolean getInsecure() {
return this.insecure;
public Boolean getTls() {
return !this.insecure;
}

public Boolean getSslInsecureDisableVerification() {
Expand All @@ -90,6 +99,10 @@ public boolean isAcknowledgmentsEnabled() {
return this.acknowledgments;
}

public Duration getPartitionAcknowledgmentTimeout() {
return this.partitionAcknowledgmentTimeout;
}

public static class CredentialsConfig {
@JsonProperty("username")
private String username;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ public class ExportProgressState {
@JsonProperty("exportTime")
private String exportTime;

@JsonProperty("status")
private String status;


public String getDatabaseName() {
return databaseName;
Expand All @@ -42,4 +45,12 @@ public String getExportTime() {
public void setExportTime(String exportTime) {
this.exportTime = exportTime;
}

public String getStatus() {
return status;
}

public void setStatus(String status) {
this.status = status;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.mongo.export;

import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator;
import org.opensearch.dataprepper.plugins.mongo.coordination.partition.DataQueryPartition;
import org.opensearch.dataprepper.plugins.mongo.coordination.state.DataQueryProgressState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.util.Optional;

/**
* A helper class to handle the data query partition status and the progress state
* It will use coordinator APIs under the hood.
*/
public class DataQueryPartitionCheckpoint {
private static final Logger LOG = LoggerFactory.getLogger(DataQueryPartitionCheckpoint.class);

static final Duration CHECKPOINT_OWNERSHIP_TIMEOUT_INCREASE = Duration.ofMinutes(5);


private final EnhancedSourceCoordinator enhancedSourceCoordinator;

private final DataQueryPartition dataQueryPartition;


public DataQueryPartitionCheckpoint(EnhancedSourceCoordinator enhancedSourceCoordinator, DataQueryPartition dataQueryPartition) {
this.enhancedSourceCoordinator = enhancedSourceCoordinator;
this.dataQueryPartition = dataQueryPartition;
}

private void setProgressState(long records) {
//Always has a state.
Optional<DataQueryProgressState> progressState = dataQueryPartition.getProgressState();
progressState.get().setLoadedRecords(records);
}

/**
* This method is to do a checkpoint with latest sequence number processed.
* Note that this should be called on a regular basis even there are no changes to sequence number
* As the checkpoint will also extend the timeout for the lease
*
* @param recordNumber The last record number
*/
public void checkpoint(int recordNumber) {
LOG.debug("Checkpoint partition query " + dataQueryPartition.getQuery() + " with record number " + recordNumber);
setProgressState(recordNumber);
enhancedSourceCoordinator.saveProgressStateForPartition(dataQueryPartition, CHECKPOINT_OWNERSHIP_TIMEOUT_INCREASE);
}

public void updateDatafileForAcknowledgmentWait(final Duration acknowledgmentSetTimeout) {
enhancedSourceCoordinator.saveProgressStateForPartition(dataQueryPartition, acknowledgmentSetTimeout);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.mongo.export;

import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoCursor;
import com.mongodb.client.MongoDatabase;
import io.micrometer.core.instrument.Counter;
import org.bson.Document;
import org.bson.conversions.Bson;
import org.bson.json.JsonMode;
import org.bson.json.JsonWriterSettings;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet;
import org.opensearch.dataprepper.plugins.mongo.buffer.RecordBufferWriter;
import org.opensearch.dataprepper.plugins.mongo.client.BsonHelper;
import org.opensearch.dataprepper.plugins.mongo.client.MongoDBConnection;
import org.opensearch.dataprepper.plugins.mongo.configuration.MongoDBSourceConfig;
import org.opensearch.dataprepper.plugins.mongo.coordination.partition.DataQueryPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.List;

public class ExportPartitionWorker implements Runnable {
private static final Logger LOG = LoggerFactory.getLogger(ExportPartitionWorker.class);
private static final int PARTITION_KEY_PARTS = 4;
static final String SUCCESS_ITEM_COUNTER_NAME = "exportRecordsSuccessTotal";
static final String FAILURE_ITEM_COUNTER_NAME = "exportRecordsFailedTotal";
private static final String PARTITION_KEY_SPLITTER = "\\|";
private static final String COLLECTION_SPLITTER = "\\.";

/**
* Number of lines to be read in a batch
*/
private static final int DEFAULT_BATCH_SIZE = 100;
/**
* Start Line is the checkpoint
*/
private final int startLine;

/**
* Default regular checkpoint interval
*/
private static final int DEFAULT_CHECKPOINT_INTERVAL_MILLS = 2 * 60_000;

/**
* A flag to interrupt the process
*/
private static volatile boolean shouldStop = false;

private final MongoDBSourceConfig sourceConfig;
private final Counter successItemsCounter;
private final Counter failureItemsCounter;
private final RecordBufferWriter recordBufferWriter;
private final DataQueryPartition dataQueryPartition;
private final AcknowledgementSet acknowledgementSet;
private final DataQueryPartitionCheckpoint partitionCheckpoint;


public ExportPartitionWorker(final RecordBufferWriter recordBufferWriter,
final DataQueryPartition dataQueryPartition,
final AcknowledgementSet acknowledgementSet,
final MongoDBSourceConfig sourceConfig,
final DataQueryPartitionCheckpoint partitionCheckpoint,
final PluginMetrics pluginMetrics) {
this.recordBufferWriter = recordBufferWriter;
this.dataQueryPartition = dataQueryPartition;
this.acknowledgementSet = acknowledgementSet;
this.sourceConfig = sourceConfig;
this.partitionCheckpoint = partitionCheckpoint;
this.startLine = 0;// replace it with checkpoint line
this.successItemsCounter = pluginMetrics.counter(SUCCESS_ITEM_COUNTER_NAME);
this.failureItemsCounter = pluginMetrics.counter(FAILURE_ITEM_COUNTER_NAME);
}

@Override
public void run() {
final List<String> partitionKeys = List.of(dataQueryPartition.getPartitionKey().split(PARTITION_KEY_SPLITTER));
if (partitionKeys.size() < PARTITION_KEY_PARTS) {
throw new RuntimeException("Invalid Partition Key. Must as db.collection|gte|lte format. Key: " + dataQueryPartition.getPartitionKey());
}
final List<String> collection = List.of(partitionKeys.get(0).split(COLLECTION_SPLITTER));
final String gte = partitionKeys.get(1);
final String lte = partitionKeys.get(2);
final String className = partitionKeys.get(3);
if (collection.size() < 2) {
throw new RuntimeException("Invalid Collection Name. Must as db.collection format");
}
long lastCheckpointTime = System.currentTimeMillis();
try (final MongoClient mongoClient = MongoDBConnection.getMongoClient(sourceConfig)) {
final MongoDatabase db = mongoClient.getDatabase(collection.get(0));
final MongoCollection<Document> col = db.getCollection(collection.get(1));
final Bson query = BsonHelper.buildAndQuery(gte, lte, className);
long totalRecords = 0L;
long successRecords = 0L;
long failedRecords = 0L;

// line count regardless the start line number
int recordCount = 0;
int lastRecordNumberProcessed = 0;
final List<String> records = new ArrayList<>();
try (MongoCursor<Document> cursor = col.find(query).iterator()) {
while (cursor.hasNext()) {
if (shouldStop) {
partitionCheckpoint.checkpoint(lastRecordNumberProcessed);
LOG.warn("Loading data query {} was interrupted by a shutdown signal, giving up ownership of " +
"query partition", query);
throw new RuntimeException("Loading data query interrupted");
}
totalRecords += 1;
recordCount += 1;
if (totalRecords <= startLine) {
continue;
}

try {
final JsonWriterSettings writerSettings = JsonWriterSettings.builder()
.outputMode(JsonMode.RELAXED)
.objectIdConverter((value, writer) -> writer.writeString(value.toHexString()))
.build();
final String record = cursor.next().toJson(writerSettings);
records.add(record);

if ((recordCount - startLine) % DEFAULT_BATCH_SIZE == 0) {
LOG.debug("Write to buffer for line " + (recordCount - DEFAULT_BATCH_SIZE) + " to " + recordCount);
recordBufferWriter.writeToBuffer(acknowledgementSet, records);
records.clear();
lastRecordNumberProcessed = recordCount;
// checkpointing in finally block when all records are processed
}

if (System.currentTimeMillis() - lastCheckpointTime > DEFAULT_CHECKPOINT_INTERVAL_MILLS) {
LOG.debug("Perform regular checkpointing for Data Query Loader");
partitionCheckpoint.checkpoint(lastRecordNumberProcessed);
lastCheckpointTime = System.currentTimeMillis();

}

successItemsCounter.increment();
successRecords += 1;
} catch (Exception e) {
LOG.error("Failed to add record to buffer with error {}", e.getMessage());
failureItemsCounter.increment();
failedRecords += 1;
}
}

if (!records.isEmpty()) {
// Do final checkpoint.
// If all records were written to buffer, checkpoint will be done in finally block
recordBufferWriter.writeToBuffer(acknowledgementSet, records);
partitionCheckpoint.checkpoint(recordCount);
}

records.clear();

LOG.info("Completed writing query partition: {} to buffer", query);

if (acknowledgementSet != null) {
partitionCheckpoint.updateDatafileForAcknowledgmentWait(sourceConfig.getPartitionAcknowledgmentTimeout());
acknowledgementSet.complete();
}

} catch (Exception e) {
LOG.error("Exception connecting to cluster and loading partition {}. Exception: {}", query, e.getMessage());
throw new RuntimeException(e);
} finally {
// Do final checkpoint when reaching end of partition or due to exception
partitionCheckpoint.checkpoint(recordCount);
}

LOG.info("Records processed: {}, recordCount: {}", totalRecords, recordCount);
}
}

/**
* Currently, this is to stop all consumers.
*/
public static void stopAll() {
shouldStop = true;
}
}
Loading

0 comments on commit 43edcb5

Please sign in to comment.