Skip to content

Commit ad33a3f

Browse files
committed
Carabas/DMS: Improve CloudFormation stack
- Configure ReplicationType to use `full-load-and-cdc`. - Configure ReplicationSettings to use `EnableBeforeImage`. - Add RDSParameterGroup to configure pgaudit, pglogical, and pg_stat_statements plugins. - Configure DMS source endpoint (PostgreSQL) to use pglogical. - Configure DMS target endpoint (Kinesis) to include all optional details: ControlDetails, PartitionValue, TransactionDetails, NullAndEmpty, TableAlterOperations, IncludeSchemaTable - Add `RDSInstanceArn` output variable. - Add `ReplicationArn` output variable.
1 parent 9cc8e17 commit ad33a3f

File tree

3 files changed

+76
-28
lines changed

3 files changed

+76
-28
lines changed

doc/carabas/research.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,3 +34,11 @@
3434

3535
## DMS
3636
- https://stackoverflow.com/questions/77995867/dynamic-tables-via-dms-kinesis-iceberg-transactional-data-lake
37+
- https://aws.amazon.com/blogs/database/tune-replication-performance-with-aws-dms-for-an-amazon-kinesis-data-streams-target-endpoint-part-3/
38+
- https://www.cockroachlabs.com/docs/stable/aws-dms
39+
40+
## wal2json
41+
- https://hevodata.com/learn/pg-logical/
42+
- https://aws.amazon.com/blogs/database/stream-changes-from-amazon-rds-for-postgresql-using-amazon-kinesis-data-streams-and-aws-lambda/
43+
- https://github.com/eulerto/wal2json
44+
- https://docs.aws.amazon.com/AmazonRDS/latest/PostgreSQLReleaseNotes/postgresql-extensions.html#postgresql-extensions-15x

examples/aws/rds_postgresql_kinesis_lambda_oci_cratedb.py

Lines changed: 7 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,9 @@ def main():
1919
- CrateDB Cloud
2020
2121
Prerequisites: Register an OCI repository.
22+
23+
Resources:
24+
- https://docs.aws.amazon.com/AmazonRDS/latest/UserGuide/USER_VPC.WorkingWithRDSInstanceinaVPC.html
2225
"""
2326

2427
# Build and publish OCI image that includes the AWS Lambda function.
@@ -70,34 +73,13 @@ def main():
7073
psql_command = (
7174
f'psql "postgresql://{stack.db_username}:{stack.db_password}@{PublicDbEndpoint}:{PublicDbPort}/postgres"'
7275
)
76+
77+
print("Result of CloudFormation deployment:")
7378
print(psql_command)
7479

80+
print("RDS Instance ARN:", stack.get_output_value(stack._bsm, "RDSInstanceArn"))
7581
print("Stream ARN:", stack.get_output_value(stack._bsm, "StreamArn"))
76-
77-
"""
78-
aws dms describe-replications
79-
aws dms start-replication \
80-
--start-replication-type=start-replication \
81-
--replication-config-arn arn:aws:dms:eu-central-1:931394475905:replication-config:LB2JAGY7XFB7PA7HEX3MI36CUA
82-
83-
aws logs describe-log-groups
84-
aws logs start-live-tail --log-group-identifiers \
85-
arn:aws:logs:eu-central-1:931394475905:log-group:/aws/rds/instance/testdrive-dms-postgresql-dev-db/postgresql \
86-
arn:aws:logs:eu-central-1:931394475905:log-group:dms-serverless-replication-LB2JAGY7XFB7PA7HEX3MI36CUA
87-
88-
aws cloudformation continue-update-rollback --stack-name testdrive-dms-postgresql-dev
89-
aws cloudformation delete-stack --stack-name testdrive-dms-postgresql-dev
90-
"""
91-
"""
92-
- https://docs.aws.amazon.com/dms/latest/APIReference/API_StartReplication.html#DMS-StartReplication-request-StartReplicationType
93-
- https://docs.aws.amazon.com/cli/latest/reference/dms/start-replication-task.html
94-
95-
Possible values:
96-
97-
- start-replication
98-
- resume-processing
99-
- reload-target
100-
"""
82+
print("Replication ARN:", stack.get_output_value(stack._bsm, "ReplicationArn"))
10183

10284

10385
if __name__ == "__main__":

lorrystream/carabas/aws/stack/dms.py

Lines changed: 61 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import json
12
import typing as t
23

34
import attr
@@ -199,6 +200,28 @@ def database(self):
199200
)
200201
group.add(self._db_security_group)
201202

203+
# aws rds describe-db-parameter-groups
204+
# aws rds describe-db-parameters --db-parameter-group-name default.postgres15
205+
db_parameter_group = rds.DBParameterGroup(
206+
"RDSPostgreSQLParameterGroup",
207+
rp_Family="postgres15",
208+
rp_Description="DMS parameter group for postgres15",
209+
p_DBParameterGroupName="dms-postgres15",
210+
# aws rds describe-db-parameters --db-parameter-group-name default.postgres15
211+
p_Parameters={
212+
"log_connections": True,
213+
# https://docs.aws.amazon.com/AmazonRDS/latest/UserGuide/Appendix.PostgreSQL.CommonDBATasks.pgaudit.html
214+
"pgaudit.log": "all",
215+
"pgaudit.log_statement_once": True,
216+
# `rds.logical_replication is a cluster level setting, not db instance setting?
217+
# https://stackoverflow.com/a/66252465
218+
"rds.logical_replication": True,
219+
# TODO: wal2json?
220+
"shared_preload_libraries": "pgaudit,pglogical,pg_stat_statements",
221+
},
222+
)
223+
group.add(db_parameter_group)
224+
202225
db = rds.DBInstance(
203226
"RDSPostgreSQL",
204227
p_DBInstanceClass="db.t3.micro",
@@ -208,6 +231,7 @@ def database(self):
208231
# The current default engine version for AWS DMS is 3.5.2.
209232
# https://docs.aws.amazon.com/dms/latest/userguide/CHAP_ReleaseNotes.html
210233
p_EngineVersion="15",
234+
p_DBParameterGroupName="dms-postgres15",
211235
# The parameter AllocatedStorage must be provided and must not be null.
212236
# Invalid storage size for engine name postgres and storage type gp2: 1
213237
p_AllocatedStorage="5",
@@ -235,11 +259,17 @@ def database(self):
235259
Name=cf.Sub.from_params(f"{self.env_name}-db"),
236260
Description=cf.Sub.from_params(f"The DB instance for {self.env_name}"),
237261
),
238-
ra_DependsOn=[self._db_security_group, self._db_subnet_group],
262+
ra_DependsOn=[db_parameter_group, self._db_security_group, self._db_subnet_group],
239263
)
240264
self._db = db
241265
group.add(db)
242266

267+
rds_arn = cf.Output(
268+
"RDSInstanceArn",
269+
Value=db.rv_DBInstanceArn,
270+
)
271+
group.add(rds_arn)
272+
243273
public_endpoint = cf.Output(
244274
"PublicDbEndpoint",
245275
Value=db.rv_EndpointAddress,
@@ -406,6 +436,9 @@ def dms(self):
406436
)
407437
group.add(vpc_endpoint_stream)
408438

439+
# https://docs.aws.amazon.com/dms/latest/userguide/CHAP_Source.PostgreSQL.html#CHAP_Source.PostgreSQL.Advanced
440+
# https://docs.aws.amazon.com/dms/latest/userguide/CHAP_Source.PostgreSQL.html#CHAP_Source.PostgreSQL.RDSPostgreSQL
441+
# https://docs.aws.amazon.com/dms/latest/userguide/CHAP_Source.PostgreSQL.html#CHAP_Source.PostgreSQL.ConnectionAttrib
409442
source_endpoint = dms.Endpoint( # type: ignore[call-arg,misc]
410443
"DMSSourceEndpoint",
411444
rp_EndpointType="source",
@@ -417,6 +450,12 @@ def dms(self):
417450
p_Username=self.db_username,
418451
p_Password=self.db_password,
419452
p_DatabaseName="postgres",
453+
p_ExtraConnectionAttributes=json.dumps(
454+
{
455+
"CaptureDdls": True,
456+
"PluginName": "pglogical",
457+
}
458+
),
420459
p_EndpointIdentifier=f"{self.env_name}-endpoint-source",
421460
ra_DependsOn=[self._db],
422461
)
@@ -427,6 +466,12 @@ def dms(self):
427466
p_KinesisSettings=dms.PropEndpointKinesisSettings(
428467
p_StreamArn=self._stream.rv_Arn,
429468
p_MessageFormat="json-unformatted",
469+
p_IncludeControlDetails=True,
470+
p_IncludePartitionValue=True,
471+
p_IncludeTransactionDetails=True,
472+
p_IncludeNullAndEmpty=True,
473+
p_IncludeTableAlterOperations=True,
474+
p_PartitionIncludeSchemaTable=True,
430475
# The parameter ServiceAccessRoleArn must be provided and must not be blank.
431476
p_ServiceAccessRoleArn=dms_target_access_role.rv_Arn,
432477
),
@@ -437,6 +482,7 @@ def dms(self):
437482
group.add(target_endpoint)
438483

439484
# FIXME: Currently hard-coded to table `public.foo`.
485+
# https://docs.aws.amazon.com/dms/latest/userguide/CHAP_Target.Kinesis.html
440486
map_to_kinesis = {
441487
"rules": [
442488
{
@@ -466,7 +512,7 @@ def dms(self):
466512
"DMSReplicationConfig",
467513
rp_ReplicationConfigIdentifier=f"{self.env_name}-dms-serverless",
468514
# p_ResourceIdentifier=f"{self.env_name}-dms-serverless-resource", # noqa: ERA001
469-
rp_ReplicationType="full-load",
515+
rp_ReplicationType="full-load-and-cdc",
470516
rp_SourceEndpointArn=source_endpoint.ref(),
471517
rp_TargetEndpointArn=target_endpoint.ref(),
472518
rp_ComputeConfig=dms.PropReplicationConfigComputeConfig(
@@ -478,6 +524,12 @@ def dms(self):
478524
),
479525
rp_TableMappings=map_to_kinesis,
480526
p_ReplicationSettings={
527+
# https://docs.aws.amazon.com/dms/latest/userguide/CHAP_Tasks.CustomizingTasks.TaskSettings.BeforeImage.html
528+
"BeforeImageSettings": {
529+
"EnableBeforeImage": True,
530+
"FieldName": "before-image",
531+
"ColumnFilter": "pk-only",
532+
},
481533
# https://docs.aws.amazon.com/dms/latest/userguide/CHAP_Tasks.CustomizingTasks.TaskSettings.Logging.html
482534
"Logging": {
483535
"EnableLogging": True,
@@ -507,7 +559,7 @@ def dms(self):
507559
# {"Id": "VALIDATOR", "Severity": "LOGGER_SEVERITY_DETAILED_DEBUG"}, # noqa: ERA001
508560
{"Id": "VALIDATOR_EXT", "Severity": "LOGGER_SEVERITY_DETAILED_DEBUG"},
509561
],
510-
}
562+
},
511563
},
512564
ra_DependsOn=[
513565
dms_replication_subnet_group,
@@ -521,6 +573,12 @@ def dms(self):
521573
)
522574
group.add(serverless_replication)
523575

576+
replication_arn = cf.Output(
577+
"ReplicationArn",
578+
Value=serverless_replication.rv_ReplicationConfigArn,
579+
)
580+
group.add(replication_arn)
581+
524582
return self.add(group)
525583

526584
@property

0 commit comments

Comments
 (0)