Skip to content

Commit

Permalink
Add template
Browse files Browse the repository at this point in the history
Signed-off-by: Hai Yan <oeyh@amazon.com>
  • Loading branch information
oeyh committed Sep 13, 2024
1 parent c28cc62 commit 42aec4c
Show file tree
Hide file tree
Showing 9 changed files with 168 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ public class DynamicConfigTransformer implements PipelineConfigurationTransforme
private static final String SINK_SUBPIPELINE_PLUGIN_NAME = "pipeline";
private static final String SUBPIPELINE_PATH = "$.source.pipeline";

private static final String S3_BUFFER_PREFIX = "/buffer";

Configuration parseConfigWithJsonNode = Configuration.builder()
.jsonProvider(new JacksonJsonNodeJsonProvider())
Expand Down Expand Up @@ -402,15 +403,28 @@ private boolean isJsonPath(String parameter) {
}

/**
* Specific to DocDB depth field.
* @param s3Prefix
* @return
* Calculate s3 folder scan depth for DocDB source pipeline
* @param s3Prefix: s3 prefix defined in the source configuration
* @return s3 folder scan depth
*/
public String calculateDepth(String s3Prefix) {
return Integer.toString(getDepth(s3Prefix, 4));
}

/**
* Calculate s3 folder scan depth for RDS source pipeline
* @param s3Prefix: s3 prefix defined in the source configuration
* @return s3 folder scan depth
*/
public String calculateDepthForRdsSource(String s3Prefix) {
return Integer.toString(getDepth(s3Prefix, 3));
}

private int getDepth(String s3Prefix, int baseDepth) {
if(s3Prefix == null){
return Integer.toString(4);
return baseDepth;
}
return Integer.toString(s3Prefix.split("/").length + 4);
return s3Prefix.split("/").length + baseDepth;
}

public String getSourceCoordinationIdentifierEnvVariable(String s3Prefix){
Expand All @@ -421,6 +435,23 @@ public String getSourceCoordinationIdentifierEnvVariable(String s3Prefix){
return s3Prefix+"/"+envSourceCoordinationIdentifier;
}

/**
* Get the include_prefix in s3 scan source. This is a function specific to RDS source.
* @param s3Prefix: s3 prefix defined in the source configuration
* @return the actual include_prefix
*/
public String getIncludePrefixForRdsSource(String s3Prefix) {
String envSourceCoordinationIdentifier = System.getenv(SOURCE_COORDINATION_IDENTIFIER_ENVIRONMENT_VARIABLE);
if (s3Prefix == null && envSourceCoordinationIdentifier == null) {
return "";
} else if (s3Prefix == null) {
return envSourceCoordinationIdentifier + S3_BUFFER_PREFIX;
} else if (envSourceCoordinationIdentifier == null) {
return s3Prefix + S3_BUFFER_PREFIX;
}
return s3Prefix + "/" + envSourceCoordinationIdentifier + S3_BUFFER_PREFIX;
}

public String getAccountIdFromRole(final String roleArn) {
return Arn.fromString(roleArn).accountId().orElse(null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,17 +156,16 @@ private SchemaManager getSchemaManager(final RdsSourceConfig sourceConfig, final
private String getS3PathPrefix() {
final String s3UserPathPrefix;
if (sourceConfig.getS3Prefix() != null && !sourceConfig.getS3Prefix().isBlank()) {
s3UserPathPrefix = sourceConfig.getS3Prefix() + S3_PATH_DELIMITER;
s3UserPathPrefix = sourceConfig.getS3Prefix();
} else {
s3UserPathPrefix = "";
}

final String s3PathPrefix;
final Instant now = Instant.now();
if (sourceCoordinator.getPartitionPrefix() != null ) {
s3PathPrefix = s3UserPathPrefix + sourceCoordinator.getPartitionPrefix() + S3_PATH_DELIMITER + now.toEpochMilli() + S3_PATH_DELIMITER;
s3PathPrefix = s3UserPathPrefix + S3_PATH_DELIMITER + sourceCoordinator.getPartitionPrefix();
} else {
s3PathPrefix = s3UserPathPrefix + now.toEpochMilli() + S3_PATH_DELIMITER;
s3PathPrefix = s3UserPathPrefix;
}
return s3PathPrefix;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ public class ExportConfig {
@NotNull
private String kmsKeyId;

/**
* The ARN of the IAM role that will be passed to RDS for export.
*/
@JsonProperty("iam_role_arn")
@NotNull
private String iamRoleArn;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ public Event convert(final Event event,
.map(key -> event.get(key, String.class))
.collect(Collectors.joining("|"));
eventMetadata.setAttribute(PRIMARY_KEY_DOCUMENT_ID_METADATA_ATTRIBUTE, primaryKeyValue);

final String s3PartitionKey = s3Prefix + S3_PATH_DELIMITER + S3_BUFFER_PREFIX + S3_PATH_DELIMITER + hashKeyToPartition(primaryKeyValue);
eventMetadata.setAttribute(MetadataKeyAttributes.EVENT_S3_PARTITION_KEY, s3PartitionKey);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ private void init() {

private void createExportPartition(RdsSourceConfig sourceConfig) {
ExportProgressState progressState = new ExportProgressState();
progressState.setIamRoleArn(sourceConfig.getAwsAuthenticationConfig().getAwsStsRoleArn());
progressState.setIamRoleArn(sourceConfig.getExport().getIamRoleArn());
progressState.setBucket(sourceConfig.getS3Bucket());
// This prefix is for data exported from RDS
progressState.setPrefix(getS3PrefixForExport(s3Prefix));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,16 @@

package org.opensearch.dataprepper.plugins.source.rds.model;

import java.util.Arrays;
import java.util.stream.Collectors;

/**
* Represents the object key for an object exported to S3 by RDS.
* The object key has this structure: "{prefix}/{export task ID}/{database name}/{table name}/{numbered folder}/{file name}"
*/
public class ExportObjectKey {

static final String S3_PATH_DELIMITER = "/";
private final String prefix;
private final String exportTaskId;
private final String databaseName;
Expand All @@ -29,18 +33,21 @@ public class ExportObjectKey {

public static ExportObjectKey fromString(final String objectKeyString) {

final String[] parts = objectKeyString.split("/");
if (parts.length != 6) {
final String[] parts = objectKeyString.split(S3_PATH_DELIMITER);
if (parts.length < 5) {
throw new IllegalArgumentException("Export object key is not valid: " + objectKeyString);
}
final String prefix = parts[0];
final String exportTaskId = parts[1];
final String databaseName = parts[2];

final String prefix = Arrays.stream(parts, 0, parts.length - 5)
.collect(Collectors.joining(S3_PATH_DELIMITER));
final String exportTaskId = parts[parts.length - 5];
final String databaseName = parts[parts.length - 4];
// fullTableName is in the format of "databaseName.tableName"
final String fullTableName = parts[3];
final String fullTableName = parts[parts.length - 3];
final String tableName = fullTableName.split("\\.")[1];
final String numberedFolder = parts[4];
final String fileName = parts[5];
final String numberedFolder = parts[parts.length - 2];
final String fileName = parts[parts.length - 1];

return new ExportObjectKey(prefix, exportTaskId, databaseName, tableName, numberedFolder, fileName);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
plugin_name: "rds"
apply_when:
- "$..source.rds"
- "$..source.documentdb.s3_bucket"
- "$..source.rds"
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
"<<pipeline-name>>":
workers: "<<$.<<pipeline-name>>.workers>>"
delay: "<<$.<<pipeline-name>>.delay>>"
buffer: "<<$.<<pipeline-name>>.buffer>>"
source:
rds: "<<$.<<pipeline-name>>.source.rds>>"
routes:
- initial_load: 'getMetadata("ingestion_type") == "EXPORT"'
- stream_load: 'getMetadata("ingestion_type") == "STREAM"'
sink:
- s3:
routes:
- initial_load
aws:
region: "<<$.<<pipeline-name>>.source.rds.s3_region>>"
sts_role_arn: "<<$.<<pipeline-name>>.source.rds.aws.sts_role_arn>>"
sts_external_id: "<<$.<<pipeline-name>>.source.rds.aws.sts_external_id>>"
sts_header_overrides: "<<$.<<pipeline-name>>.source.rds.aws.sts_header_overrides>>"
bucket: "<<$.<<pipeline-name>>.source.rds.s3_bucket>>"
threshold:
event_collect_timeout: "120s"
maximum_size: "2mb"
aggregate_threshold:
maximum_size: "128mb"
flush_capacity_ratio: 0
object_key:
path_prefix: "${getMetadata(\"s3_partition_key\")}"
codec:
event_json:
default_bucket_owner: "<<FUNCTION_NAME:getAccountIdFromRole,PARAMETER:$.<<pipeline-name>>.source.rds.aws.sts_role_arn>>"
- s3:
routes:
- stream_load
aws:
region: "<<$.<<pipeline-name>>.source.rds.s3_region>>"
sts_role_arn: "<<$.<<pipeline-name>>.source.rds.aws.sts_role_arn>>"
sts_external_id: "<<$.<<pipeline-name>>.source.rds.aws.sts_external_id>>"
sts_header_overrides: "<<$.<<pipeline-name>>.source.rds.aws.sts_header_overrides>>"
bucket: "<<$.<<pipeline-name>>.source.rds.s3_bucket>>"
threshold:
event_collect_timeout: "15s"
maximum_size: "1mb"
aggregate_threshold:
maximum_size: "128mb"
flush_capacity_ratio: 0
object_key:
path_prefix: "${getMetadata(\"s3_partition_key\")}"
codec:
event_json:
default_bucket_owner: "<<FUNCTION_NAME:getAccountIdFromRole,PARAMETER:$.<<pipeline-name>>.source.rds.aws.sts_role_arn>>"
"<<pipeline-name>>-s3":
workers: "<<$.<<pipeline-name>>.workers>>"
delay: "<<$.<<pipeline-name>>.delay>>"
buffer: "<<$.<<pipeline-name>>.buffer>>"
source:
s3:
codec:
event_json:
compression: "none"
aws:
region: "<<$.<<pipeline-name>>.source.rds.s3_region>>"
sts_role_arn: "<<$.<<pipeline-name>>.source.rds.aws.sts_role_arn>>"
sts_external_id: "<<$.<<pipeline-name>>.source.rds.aws.sts_external_id>>"
sts_header_overrides: "<<$.<<pipeline-name>>.source.rds.aws.sts_header_overrides>>"
acknowledgments: true
delete_s3_objects_on_read: true
disable_s3_metadata_in_event: true
scan:
folder_partitions:
depth: "<<FUNCTION_NAME:calculateDepthForRdsSource,PARAMETER:$.<<pipeline-name>>.source.rds.s3_prefix>>"
max_objects_per_ownership: 50
buckets:
- bucket:
name: "<<$.<<pipeline-name>>.source.rds.s3_bucket>>"
filter:
include_prefix: ["<<FUNCTION_NAME:getIncludePrefixForRdsSource,PARAMETER:$.<<pipeline-name>>.source.rds.s3_prefix>>"]
scheduling:
interval: "60s"
processor: "<<$.<<pipeline-name>>.processor>>"
sink: "<<$.<<pipeline-name>>.sink>>"
routes: "<<$.<<pipeline-name>>.routes>>" # In placeholder, routes or route (defined as alias) will be transformed to route in json as route will be primarily picked in pipelineModel.
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,35 @@ void test_fromString_with_valid_input_string() {
assertThat(exportObjectKey.getFileName(), equalTo("file-name.parquet"));
}

@Test
void test_fromString_with_path_with_empty_prefix() {
final String objectKeyString = "export-task-id/db-name/db-name.table-name/1/file-name.parquet";
final ExportObjectKey exportObjectKey = ExportObjectKey.fromString(objectKeyString);

assertThat(exportObjectKey.getPrefix(), equalTo(""));
assertThat(exportObjectKey.getExportTaskId(), equalTo("export-task-id"));
assertThat(exportObjectKey.getDatabaseName(), equalTo("db-name"));
assertThat(exportObjectKey.getTableName(), equalTo("table-name"));
assertThat(exportObjectKey.getNumberedFolder(), equalTo("1"));
assertThat(exportObjectKey.getFileName(), equalTo("file-name.parquet"));
}

@Test
void test_fromString_with_path_with_multilevel_prefix() {
final String objectKeyString = "prefix1/prefix2/prefix3/export-task-id/db-name/db-name.table-name/1/file-name.parquet";
final ExportObjectKey exportObjectKey = ExportObjectKey.fromString(objectKeyString);

assertThat(exportObjectKey.getPrefix(), equalTo("prefix1/prefix2/prefix3"));
assertThat(exportObjectKey.getExportTaskId(), equalTo("export-task-id"));
assertThat(exportObjectKey.getDatabaseName(), equalTo("db-name"));
assertThat(exportObjectKey.getTableName(), equalTo("table-name"));
assertThat(exportObjectKey.getNumberedFolder(), equalTo("1"));
assertThat(exportObjectKey.getFileName(), equalTo("file-name.parquet"));
}

@Test
void test_fromString_with_invalid_input_string() {
final String objectKeyString = "prefix/export-task-id/db-name/table-name/1/";
final String objectKeyString = "export-task-id/db-name/table-name/1";

Throwable exception = assertThrows(IllegalArgumentException.class, () -> ExportObjectKey.fromString(objectKeyString));
assertThat(exception.getMessage(), containsString("Export object key is not valid: " + objectKeyString));
Expand Down

0 comments on commit 42aec4c

Please sign in to comment.