Skip to content

Commit

Permalink
Add mongodb as alternate source name for documentdb source (opensearc…
Browse files Browse the repository at this point in the history
…h-project#4969)

Signed-off-by: Dinu John <86094133+dinujoh@users.noreply.github.com>
  • Loading branch information
dinujoh authored Sep 22, 2024
1 parent 8abf1d5 commit 92b28a3
Show file tree
Hide file tree
Showing 3 changed files with 86 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import java.util.function.Function;


@DataPrepperPlugin(name = "documentdb", pluginType = Source.class, pluginConfigurationType = MongoDBSourceConfig.class)
@DataPrepperPlugin(name = "documentdb", alternateNames = "mongodb", pluginType = Source.class, pluginConfigurationType = MongoDBSourceConfig.class)

public class DocumentDBSource implements Source<Record<Event>>, UsesEnhancedSourceCoordination {
private static final Logger LOG = LoggerFactory.getLogger(DocumentDBSource.class);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
plugin_name: "mongodb"
apply_when:
- "$..source.mongodb"
- "$..source.mongodb.s3_bucket"
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
"<<pipeline-name>>":
workers: "<<$.<<pipeline-name>>.workers>>"
delay: "<<$.<<pipeline-name>>.delay>>"
buffer: "<<$.<<pipeline-name>>.buffer>>"
source:
mongodb: "<<$.<<pipeline-name>>.source.mongodb>>"
routes:
- initial_load: 'getMetadata("ingestion_type") == "EXPORT"'
- stream_load: 'getMetadata("ingestion_type") == "STREAM"'
sink:
- s3:
routes:
- initial_load
aws:
region: "<<$.<<pipeline-name>>.source.mongodb.s3_region>>"
sts_role_arn: "<<$.<<pipeline-name>>.source.mongodb.aws.sts_role_arn>>"
sts_external_id: "<<$.<<pipeline-name>>.source.mongodb.aws.sts_external_id>>"
sts_header_overrides: "<<$.<<pipeline-name>>.source.mongodb.aws.sts_header_overrides>>"
bucket: "<<$.<<pipeline-name>>.source.mongodb.s3_bucket>>"
threshold:
event_collect_timeout: "120s"
maximum_size: "2mb"
aggregate_threshold:
maximum_size: "128mb"
flush_capacity_ratio: 0
object_key:
path_prefix: "${getMetadata(\"s3_partition_key\")}"
codec:
event_json:
default_bucket_owner: "<<FUNCTION_NAME:getAccountIdFromRole,PARAMETER:$.<<pipeline-name>>.source.mongodb.aws.sts_role_arn>>"
- s3:
routes:
- stream_load
aws:
region: "<<$.<<pipeline-name>>.source.mongodb.s3_region>>"
sts_role_arn: "<<$.<<pipeline-name>>.source.mongodb.aws.sts_role_arn>>"
sts_external_id: "<<$.<<pipeline-name>>.source.mongodb.aws.sts_external_id>>"
sts_header_overrides: "<<$.<<pipeline-name>>.source.mongodb.aws.sts_header_overrides>>"
bucket: "<<$.<<pipeline-name>>.source.mongodb.s3_bucket>>"
threshold:
event_collect_timeout: "15s"
maximum_size: "1mb"
aggregate_threshold:
maximum_size: "128mb"
flush_capacity_ratio: 0
object_key:
path_prefix: "${getMetadata(\"s3_partition_key\")}"
codec:
event_json:
default_bucket_owner: "<<FUNCTION_NAME:getAccountIdFromRole,PARAMETER:$.<<pipeline-name>>.source.mongodb.aws.sts_role_arn>>"
"<<pipeline-name>>-s3":
workers: "<<$.<<pipeline-name>>.workers>>"
delay: "<<$.<<pipeline-name>>.delay>>"
buffer: "<<$.<<pipeline-name>>.buffer>>"
source:
s3:
codec:
event_json:
compression: "none"
aws:
region: "<<$.<<pipeline-name>>.source.mongodb.s3_region>>"
sts_role_arn: "<<$.<<pipeline-name>>.source.mongodb.aws.sts_role_arn>>"
sts_external_id: "<<$.<<pipeline-name>>.source.mongodb.aws.sts_external_id>>"
sts_header_overrides: "<<$.<<pipeline-name>>.source.mongodb.aws.sts_header_overrides>>"
acknowledgments: true
delete_s3_objects_on_read: true
disable_s3_metadata_in_event: true
scan:
folder_partitions:
depth: "<<FUNCTION_NAME:calculateDepth,PARAMETER:$.<<pipeline-name>>.source.mongodb.s3_prefix>>"
max_objects_per_ownership: 50
buckets:
- bucket:
name: "<<$.<<pipeline-name>>.source.mongodb.s3_bucket>>"
filter:
include_prefix: ["<<FUNCTION_NAME:getSourceCoordinationIdentifierEnvVariable,PARAMETER:$.<<pipeline-name>>.source.mongodb.s3_prefix>>"]
scheduling:
interval: "60s"
processor: "<<$.<<pipeline-name>>.processor>>"
sink: "<<$.<<pipeline-name>>.sink>>"
routes: "<<$.<<pipeline-name>>.routes>>" # In placeholder, routes or route (defined as alias) will be transformed to route in json as route will be primarily picked in pipelineModel.

0 comments on commit 92b28a3

Please sign in to comment.