Skip to content

Commit

Permalink
Logging update and config validation (opensearch-project#4541)
Browse files Browse the repository at this point in the history
* Logging improvements for export and stream processing for DocumentDB source

Signed-off-by: Dinu John <86094133+dinujoh@users.noreply.github.com>

* Add validation for DocumentDB Collection Config

Signed-off-by: Dinu John <86094133+dinujoh@users.noreply.github.com>

---------

Signed-off-by: Dinu John <86094133+dinujoh@users.noreply.github.com>
  • Loading branch information
dinujoh authored May 14, 2024
1 parent 0495fae commit 94fa30d
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 1 deletion.
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package org.opensearch.dataprepper.plugins.mongo.configuration;

import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.validation.constraints.Max;
import jakarta.validation.constraints.Min;
import jakarta.validation.constraints.NotNull;
import jakarta.validation.constraints.Pattern;

Expand All @@ -23,11 +25,17 @@ public class CollectionConfig {
private boolean stream;

@JsonProperty("partition_count")
@Min(1)
@Max(1000)
private int partitionCount;

@JsonProperty("export_batch_size")
@Min(1)
@Max(1_000_000)
private int exportBatchSize;
@JsonProperty("stream_batch_size")
@Min(1)
@Max(1_000_000)
private int streamBatchSize;

public CollectionConfig() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ private PartitionIdentifierBatch buildPartitions(final ExportPartition exportPar
while (!Thread.currentThread().isInterrupted()) {
try (final MongoCursor<Document> startCursor = startIterable.iterator()) {
if (!startCursor.hasNext()) {
LOG.info("No records to process or has reached end of the export partition.");
isLastBatch = true;
break;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ public void processStream(final StreamPartition streamPartition) {

try (MongoCursor<ChangeStreamDocument<Document>> cursor = getChangeStreamCursor(collection, resumeToken.orElse(null))) {
while ((shouldWaitForExport(streamPartition) || shouldWaitForS3Partition(streamPartition.getCollection())) && !Thread.currentThread().isInterrupted()) {
LOG.info("Initial load not complete for collection {}, waiting for initial lo be complete before resuming streams.", collectionDbName);
LOG.info("Initial load not complete for collection {}, waiting for initial load to be complete before resuming streams.", collectionDbName);
try {
Thread.sleep(DEFAULT_EXPORT_COMPLETE_WAIT_INTERVAL_MILLIS);
} catch (final InterruptedException ex) {
Expand All @@ -212,6 +212,7 @@ public void processStream(final StreamPartition streamPartition) {
throw new IllegalStateException("S3 partitions are not created. Please check the S3 partition creator thread.");
}
recordConverter.initializePartitions(s3Partitions);
LOG.info("Starting to watch streams for change events.");
while (!Thread.currentThread().isInterrupted() && !stopWorker) {
if (cursor.hasNext()) {
try {
Expand Down

0 comments on commit 94fa30d

Please sign in to comment.