From 12a7c3e11247a8f5817938666d489223b83e9368 Mon Sep 17 00:00:00 2001 From: Andreas Motl Date: Sun, 28 Jul 2024 18:52:15 +0200 Subject: [PATCH] Carabas: Add example DMS Serverless stack --- ...s_postgresql_kinesis_lambda_oci_cratedb.py | 105 ++++ lorrystream/carabas/aws/__init__.py | 6 +- lorrystream/carabas/aws/model.py | 74 ++- lorrystream/carabas/aws/stack/__init__.py | 0 lorrystream/carabas/aws/stack/dms.py | 574 ++++++++++++++++++ .../aws/{stack.py => stack/dynamodb.py} | 59 +- 6 files changed, 758 insertions(+), 60 deletions(-) create mode 100644 examples/aws/rds_postgresql_kinesis_lambda_oci_cratedb.py create mode 100644 lorrystream/carabas/aws/stack/__init__.py create mode 100644 lorrystream/carabas/aws/stack/dms.py rename lorrystream/carabas/aws/{stack.py => stack/dynamodb.py} (67%) diff --git a/examples/aws/rds_postgresql_kinesis_lambda_oci_cratedb.py b/examples/aws/rds_postgresql_kinesis_lambda_oci_cratedb.py new file mode 100644 index 0000000..a5e3492 --- /dev/null +++ b/examples/aws/rds_postgresql_kinesis_lambda_oci_cratedb.py @@ -0,0 +1,105 @@ +import logging + +from lorrystream.carabas.aws import RDSPostgreSQLDMSKinesisPipe +from lorrystream.util.common import setup_logging + +logger = logging.getLogger(__name__) + + +def main(): + """ + A recipe to deploy a data migration stack to Amazon AWS. + + Pipeline: + - RDS PostgreSQL -> DMS -> Kinesis Stream -> Python Lambda via OCI -> CrateDB + + Ingredients: + - DMS, RDS PostgreSQL, Kinesis + - Lambda function, shipped per OCI image + - CrateDB Cloud + + Prerequisites: Register an OCI repository. + """ + + # Build and publish OCI image that includes the AWS Lambda function. + """ + python_image = LambdaPythonImage( + name="cratedb-kinesis-lambda", + entrypoint_file=Path("./lorrystream/process/kinesis_cratedb_lambda.py"), + entrypoint_handler="kinesis_cratedb_lambda.handler", + ) + python_image.publish() + """ + + # Define an AWS CloudFormation software stack. + stack = RDSPostgreSQLDMSKinesisPipe( + project="testdrive-dms-postgresql", + stage="dev", + region="eu-central-1", + description="RDS PostgreSQL > DMS -> Kinesis Stream -> Python Lambda via OCI -> CrateDB", + db_username="dynapipe", + db_password="secret11", # noqa: S106 + environment={ + "SINK_SQLALCHEMY_URL": "crate://admin:dZ..qB@example.eks1.eu-west-1.aws.cratedb.net:4200/?ssl=true", + "SINK_TABLE": "transactions", + }, + ) + + # Add components to the stack. + """ + stack.table().processor( + LambdaFactory( + name="DynamoDBCrateDBProcessor", + oci_uri=python_image.uri, + handler=python_image.entrypoint_handler, + ) + ).connect() + """ + stack.vpc().database().stream().dms() # .table() + + # Deploy stack. + stack.deploy() + logger.info(f"Deployed stack: {stack}") + + # Refresh the OCI image. + # TODO: Detect when changed. + stack.deploy_processor_image() + + PublicDbEndpoint = stack.get_output_value(stack._bsm, "PublicDbEndpoint") + PublicDbPort = stack.get_output_value(stack._bsm, "PublicDbPort") + psql_command = ( + f'psql "postgresql://{stack.db_username}:{stack.db_password}@{PublicDbEndpoint}:{PublicDbPort}/postgres"' + ) + print(psql_command) + + print("Stream ARN:", stack.get_output_value(stack._bsm, "StreamArn")) + + """ + aws dms describe-replications + aws dms start-replication \ + --start-replication-type=start-replication \ + --replication-config-arn arn:aws:dms:eu-central-1:931394475905:replication-config:LB2JAGY7XFB7PA7HEX3MI36CUA + + aws logs describe-log-groups + aws logs start-live-tail --log-group-identifiers \ + arn:aws:logs:eu-central-1:931394475905:log-group:/aws/rds/instance/testdrive-dms-postgresql-dev-db/postgresql \ + arn:aws:logs:eu-central-1:931394475905:log-group:dms-serverless-replication-LB2JAGY7XFB7PA7HEX3MI36CUA + + aws cloudformation continue-update-rollback --stack-name testdrive-dms-postgresql-dev + aws cloudformation delete-stack --stack-name testdrive-dms-postgresql-dev + """ + """ + - https://docs.aws.amazon.com/dms/latest/APIReference/API_StartReplication.html#DMS-StartReplication-request-StartReplicationType + - https://docs.aws.amazon.com/cli/latest/reference/dms/start-replication-task.html + + Possible values: + + - start-replication + - resume-processing + - reload-target + """ + + +if __name__ == "__main__": + setup_logging() + main() diff --git a/lorrystream/carabas/aws/__init__.py b/lorrystream/carabas/aws/__init__.py index 904af12..7eb061e 100644 --- a/lorrystream/carabas/aws/__init__.py +++ b/lorrystream/carabas/aws/__init__.py @@ -1,9 +1,11 @@ from lorrystream.carabas.aws.function.model import LambdaFactory from lorrystream.carabas.aws.function.oci import LambdaPythonImage -from lorrystream.carabas.aws.stack import DynamoDBKinesisPipe +from lorrystream.carabas.aws.stack.dms import RDSPostgreSQLDMSKinesisPipe +from lorrystream.carabas.aws.stack.dynamodb import DynamoDBKinesisPipe __all__ = [ + "DynamoDBKinesisPipe", "LambdaFactory", "LambdaPythonImage", - "DynamoDBKinesisPipe", + "RDSPostgreSQLDMSKinesisPipe", ] diff --git a/lorrystream/carabas/aws/model.py b/lorrystream/carabas/aws/model.py index ecd952c..179c43c 100644 --- a/lorrystream/carabas/aws/model.py +++ b/lorrystream/carabas/aws/model.py @@ -1,9 +1,15 @@ import logging +import typing as t import attr +import botocore import cottonformation as cf from aws_cloudformation import Parameter from boto_session_manager import BotoSesManager +from cottonformation.res import kinesis + +if t.TYPE_CHECKING: + from lorrystream.carabas.aws.function.model import LambdaResource logger = logging.getLogger(__name__) @@ -27,11 +33,12 @@ def post_hook(self): self.template.Description = self.description self.define_parameters() - def add(self, thing): + def add(self, *things): """ A shortcut function to add a component to the current template of this Stack. """ - self.template.add(thing) + for thing in things: + self.template.add(thing) return self @property @@ -87,5 +94,68 @@ def deploy(self, respawn: bool = False): include_named_iam=True, verbose=True, skip_prompt=True, + # 300 seconds are not enough to wait for RDS PostgreSQL, for example. + timeout=500, ) return self + + +@attr.s +class GenericProcessorStack(GenericEnvStack): + + _processor: t.Optional["LambdaResource"] = None + + def deploy_processor_image(self): + """ + Make an already running Lambda pick up a newly published OCI image. + + This is an imperative function executed orthogonally to the CloudFormation deployment. + + It follows this procedure: + - Acquire the `Arn` Output of the Stack's core processor Lambda. + - Use it to look up a handle to the actual Lambda information. + - From the information unit, extract the OCI image URI. + - Instruct the machinery to update the Lambda function code, + effectively respawning the container running it. + """ + if not self._processor: + logger.warning("No processor defined, skip deploying processor OCI image") + return None + function_id = self._processor.function.id + + # Inquire Stack Output. + logger.info(f"Discovering Lambda function existence: {function_id}") + output_id = f"{function_id}Arn" + try: + function_arn = self.get_output_value(self._bsm, output_id) + except botocore.exceptions.ClientError as ex: + if "does not exist" not in str(ex): + raise + logger.info(f"Stack not found or incomplete: {self.stack_name}") + return None + except KeyError: + logger.info(f"Stack not found or incomplete. Output not found: {output_id}") + return None + + # Inquire AWS API and eventually update Lambda code. + client = self._bsm.get_client("lambda") + try: + if func := client.get_function(FunctionName=function_arn): + logger.info(f"Found Lambda function: {function_arn}") + oci_uri = func["Code"]["ImageUri"] + logger.info(f"Deploying new OCI image to Lambda function: {oci_uri}") + response = client.update_function_code(FunctionName=function_arn, ImageUri=oci_uri) + last_status_message = response["LastUpdateStatusReason"] + logger.info(f"Lambda update status response: {last_status_message}") + except Exception as ex: + if ex.__class__.__name__ != "ResourceNotFoundException": + raise + logger.info(f"Lambda function to update OCI image not found: {function_arn}") + + return self + + +@attr.s +class KinesisProcessorStack(GenericProcessorStack): + + _event_source: t.Optional[t.Union[kinesis.Stream]] = None diff --git a/lorrystream/carabas/aws/stack/__init__.py b/lorrystream/carabas/aws/stack/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/lorrystream/carabas/aws/stack/dms.py b/lorrystream/carabas/aws/stack/dms.py new file mode 100644 index 0000000..55f7cb0 --- /dev/null +++ b/lorrystream/carabas/aws/stack/dms.py @@ -0,0 +1,574 @@ +import typing as t + +import attr +import cottonformation as cf +from cottonformation import ResourceGroup +from cottonformation.res import awslambda, ec2, iam, kinesis, rds + +from lorrystream.carabas.aws import LambdaFactory +from lorrystream.carabas.aws.cf import dms2024 as dms +from lorrystream.carabas.aws.model import KinesisProcessorStack + + +@attr.s +class RDSPostgreSQLDMSKinesisPipe(KinesisProcessorStack): + """ + A description for an AWS CloudFormation stack for migrating from PostgreSQL. + It is written down in Python, uses OO, and a fluent API. + + It provides elements to implement this kind of pipeline: + + RDS PostgreSQL -> DMS -> Kinesis Stream -> Python Lambda via OCI -> CrateDB + + See also the canonical AWS documentation about relevant topics. + + Documentation: + - https://docs.aws.amazon.com/dms/latest/userguide/CHAP_Serverless.Components.html + - https://docs.aws.amazon.com/dms/latest/userguide/CHAP_VPC_Endpoints.html + - https://docs.aws.amazon.com/dms/latest/userguide/security-iam-awsmanpol.html + - https://docs.aws.amazon.com/dms/latest/userguide/security-iam.html#CHAP_Security.IAMPermissions + + Resources: + - https://aws.amazon.com/blogs/database/orchestrate-an-aws-dms-serverless-replication-task-using-aws-cli/ + - https://aws.amazon.com/blogs/aws/new-aws-dms-serverless-automatically-provisions-and-scales-capacity-for-migration-and-data-replication/ + - https://github.com/aws-cloudformation/aws-cloudformation-templates/blob/main/DMS/DMSAuroraToS3FullLoadAndOngoingReplication.yaml + """ + + db_username: str = attr.ib() + db_password: str = attr.ib() + + environment: t.Dict[str, str] = attr.ib(factory=dict) + + _vpc: ec2.VPC = None + _public_subnet1: ec2.Subnet = None + _public_subnet2: ec2.Subnet = None + _db_subnet_group: rds.DBSubnetGroup = None + _db_security_group: ec2.SecurityGroup = None + + _db: rds.DBInstance = None + _stream: kinesis.Stream = None + + def vpc(self): + group = ResourceGroup() + + self._vpc = ec2.VPC( + "VPCInstance", + p_CidrBlock="10.0.0.0/24", + p_EnableDnsHostnames=True, + p_EnableDnsSupport=True, + p_Tags=cf.Tag.make_many( + Name=cf.Sub.from_params(f"{self.env_name}-vpc"), + Description=cf.Sub.from_params(f"The VPC for {self.env_name}"), + ), + ) + group.add(self._vpc) + + # Even if you are deploying a single-az instance, you have to + # specify multiple availability zones in the DB subnet group. + # https://stackoverflow.com/a/70658040 + # https://stackoverflow.com/a/63975208 + self._public_subnet1 = ec2.Subnet( + "VPCPublicSubnet1", + p_CidrBlock="10.0.0.0/26", + rp_VpcId=self._vpc.ref(), + p_AvailabilityZone=cf.GetAZs.n_th(1), + p_MapPublicIpOnLaunch=False, + p_Tags=cf.Tag.make_many( + Name=cf.Sub.from_params(f"{self.env_name}-vpc-subnet1"), + Description=cf.Sub.from_params(f"The VPC subnet 1 for {self.env_name}"), + ), + ra_DependsOn=self._vpc, + ) + self._public_subnet2 = ec2.Subnet( + "VPCPublicSubnet2", + p_CidrBlock="10.0.0.64/26", + rp_VpcId=self._vpc.ref(), + p_AvailabilityZone=cf.GetAZs.n_th(2), + p_MapPublicIpOnLaunch=False, + p_Tags=cf.Tag.make_many( + Name=cf.Sub.from_params(f"{self.env_name}-vpc-subnet2"), + Description=cf.Sub.from_params(f"The VPC subnet 2 for {self.env_name}"), + ), + ra_DependsOn=self._vpc, + ) + group.add(self._public_subnet1) + group.add(self._public_subnet2) + + # Cannot create a publicly accessible DBInstance. + # The specified VPC has no internet gateway attached. + gateway = ec2.InternetGateway( + "VPCGateway", + p_Tags=cf.Tag.make_many( + Name=cf.Sub.from_params(f"{self.env_name}-vpc-gateway"), + Description=cf.Sub.from_params(f"The VPC gateway for {self.env_name}"), + ), + ra_DependsOn=self._vpc, + ) + gateway_attachment = ec2.VPCGatewayAttachment( + "VPCGatewayAttachment", + rp_VpcId=self._vpc.ref(), + p_InternetGatewayId=gateway.ref(), + ra_DependsOn=[self._vpc, gateway], + ) + group.add(gateway) + group.add(gateway_attachment) + + route_table = ec2.RouteTable( + "VPCRouteTable", + rp_VpcId=self._vpc.ref(), + p_Tags=cf.Tag.make_many( + Name=cf.Sub.from_params(f"{self.env_name}-vpc-route-table"), + Description=cf.Sub.from_params(f"The VPC routing table for {self.env_name}"), + ), + ) + group.add(route_table) + + default_route = ec2.Route( + "VPCDefaultRoute", + rp_RouteTableId=route_table.ref(), + p_DestinationCidrBlock="0.0.0.0/0", + p_GatewayId=gateway.ref(), + ra_DependsOn=gateway_attachment, + ) + group.add(default_route) + + subnet_route_1 = ec2.SubnetRouteTableAssociation( + "VPCSubnetRoute1", + rp_RouteTableId=route_table.ref(), + rp_SubnetId=self._public_subnet1.ref(), + ra_DependsOn=[route_table, self._public_subnet1], + ) + subnet_route_2 = ec2.SubnetRouteTableAssociation( + "VPCSubnetRoute2", + rp_RouteTableId=route_table.ref(), + rp_SubnetId=self._public_subnet2.ref(), + ra_DependsOn=[route_table, self._public_subnet2], + ) + group.add(subnet_route_1) + group.add(subnet_route_2) + + return self.add(group) + + def database(self): + group = ResourceGroup() + + # https://docs.aws.amazon.com/AmazonRDS/latest/UserGuide/USER_VPC.WorkingWithRDSInstanceinaVPC.html + self._db_subnet_group = rds.DBSubnetGroup( + "RDSPostgreSQLDBSubnetGroup", + rp_DBSubnetGroupDescription=f"DB subnet group for {self.env_name}", + rp_SubnetIds=[self._public_subnet1.ref(), self._public_subnet2.ref()], + p_DBSubnetGroupName=f"{self.env_name}-db-subnet-group", + p_Tags=cf.Tag.make_many(Name=cf.Sub.from_params(f"{self.env_name}-db-subnet-group")), + ra_DependsOn=[self._public_subnet1, self._public_subnet2], + ) + group.add(self._db_subnet_group) + + self._db_security_group = ec2.SecurityGroup( + "RDSPostgreSQLSecurityGroup", + rp_GroupDescription=f"DB security group for {self.env_name}", + p_GroupName=f"{self.env_name}-db-security-group", + p_VpcId=self._vpc.ref(), + p_SecurityGroupIngress=[ + ec2.PropSecurityGroupIngress( + rp_IpProtocol="TCP", + p_Description="Allow access from VPC", + p_FromPort=5432, + p_ToPort=5432, + p_CidrIp="10.0.0.0/24", + ), + # TODO: Possibly restrict to single provided ClientIP? + ec2.PropSecurityGroupIngress( + rp_IpProtocol="TCP", + p_Description="Allow access from outside", + p_FromPort=5432, + p_ToPort=5432, + p_CidrIp="0.0.0.0/0", + ), + ], + p_SecurityGroupEgress=[ + ec2.PropSecurityGroupEgress( + rp_IpProtocol="-1", + p_Description="Allow any access out", + p_FromPort=-1, + p_ToPort=-1, + p_CidrIp="0.0.0.0/0", + ) + ], + p_Tags=cf.Tag.make_many(Name=cf.Sub.from_params(f"{self.env_name}-db-security-group")), + ra_DependsOn=[self._vpc], + ) + group.add(self._db_security_group) + + db = rds.DBInstance( + "RDSPostgreSQL", + p_DBInstanceClass="db.t3.micro", + p_DBInstanceIdentifier=f"{self.env_name}-db", + p_Engine="postgres", + # PostgreSQL 16 only supported by DMS 3.5.3. + # The current default engine version for AWS DMS is 3.5.2. + # https://docs.aws.amazon.com/dms/latest/userguide/CHAP_ReleaseNotes.html + p_EngineVersion="15", + # The parameter AllocatedStorage must be provided and must not be null. + # Invalid storage size for engine name postgres and storage type gp2: 1 + p_AllocatedStorage="5", + # p_StorageType="gp3", # noqa: ERA001 + # Setting this parameter to 0 disables automated backups. + # Disabling automated backups speeds up the provisioning process. + p_BackupRetentionPeriod=0, + # To disable collection of Enhanced Monitoring metrics, specify 0. + p_MonitoringInterval=0, + p_EnablePerformanceInsights=False, + p_MasterUsername=self.db_username, + p_MasterUserPassword=self.db_password, + p_PubliclyAccessible=True, + p_MultiAZ=False, + p_VPCSecurityGroups=[ + self._db_security_group.ref(), + ], + # If there's no DB subnet group, then the DB instance isn't a VPC DB instance. + p_DBSubnetGroupName=self._db_subnet_group.ref(), + p_EnableCloudwatchLogsExports=["postgresql", "upgrade"], + ra_UpdateReplacePolicy="Retain", + ra_DeletionPolicy="Retain", + # p_DBName="testdrive", # noqa: ERA001 + p_Tags=cf.Tag.make_many( + Name=cf.Sub.from_params(f"{self.env_name}-db"), + Description=cf.Sub.from_params(f"The DB instance for {self.env_name}"), + ), + ra_DependsOn=[self._db_security_group, self._db_subnet_group], + ) + self._db = db + group.add(db) + + public_endpoint = cf.Output( + "PublicDbEndpoint", + Value=db.rv_EndpointAddress, + ) + group.add(public_endpoint) + + public_db_port = cf.Output( + "PublicDbPort", + Value=db.rv_EndpointPort, + ) + group.add(public_db_port) + return self.add(group) + + def stream(self): + group = ResourceGroup() + # https://docs.aws.amazon.com/dms/latest/userguide/CHAP_Target.Kinesis.html#CHAP_Target.Kinesis.Prerequisites + + self._stream = kinesis.Stream( + id="KinesisStream", + p_Name=f"{self.env_name}-stream", + p_StreamModeDetails={"rp_StreamMode": "ON_DEMAND"}, + ) + stream_arn = cf.Output( + "StreamArn", + Value=self._stream.rv_Arn, + ) + group.add(self._stream) + group.add(stream_arn) + return self.add(group) + + def dms(self): + """ + An AWS DMS Serverless CloudFormation description for demonstration purposes. + + https://docs.aws.amazon.com/dms/latest/userguide/security-iam.html#CHAP_Security.APIRole + + Database Migration Service requires the below IAM Roles to be created before + replication instances can be created. See the DMS Documentation for + additional information: https://docs.aws.amazon.com/dms/latest/userguide/security-iam.html#CHAP_Security.APIRole + * dms-vpc-role + * dms-cloudwatch-logs-role + * dms-access-for-endpoint + + If you use the AWS CLI or the AWS DMS API for your database migration, you must add three IAM roles + to your AWS account before you can use the features of AWS DMS. Two of these are `dms-vpc-role` and + `dms-cloudwatch-logs-role`. + + If you use Amazon Redshift as a target database, you must also add the IAM role + `dms-access-for-endpoint` to your AWS account. + + -- https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/dms_replication_instance.html + -- https://github.com/hashicorp/terraform-provider-aws/issues/19580 + -- https://docs.aws.amazon.com/dms/latest/userguide/security-iam.html#CHAP_Security.APIRole + """ + group = ResourceGroup() + + # Trust policy that is associated with upcoming roles. + # Trust policies define which entities can assume the role. + # You can associate only one trust policy with a role. + trust_policy_dms = cf.helpers.iam.AssumeRolePolicyBuilder( + cf.helpers.iam.ServicePrincipal.dms(), + ).build() + + dms_vpc_role = iam.Role( + id="DMSVPCManagementRole", + rp_AssumeRolePolicyDocument=trust_policy_dms, + # Role name must strictly be `dms-vpc-role`? + # https://stackoverflow.com/q/58542334 + # https://github.com/hashicorp/terraform-provider-aws/issues/7748 + # https://github.com/hashicorp/terraform-provider-aws/issues/11025 + # p_RoleName=cf.Sub("${EnvName}-dms-vpc-role", {"EnvName": self.param_env_name.ref()}), # noqa: ERA001, E501 + p_RoleName="dms-vpc-role", + p_Description="DMS VPC management IAM role", + p_ManagedPolicyArns=[ + cf.helpers.iam.AwsManagedPolicy.AmazonDMSVPCManagementRole, + ], + ) + dms_cloudwatch_role = iam.Role( + id="DMSCloudWatchLogsRole", + rp_AssumeRolePolicyDocument=trust_policy_dms, + # Role name must strictly be `dms-cloudwatch-logs-role`? + # https://docs.aws.amazon.com/dms/latest/userguide/CHAP_Troubleshooting.html#CHAP_Troubleshooting.General.CWL + # p_RoleName=cf.Sub("${EnvName}-dms-cloudwatch-logs-role", {"EnvName": self.param_env_name.ref()}), # noqa: ERA001, E501 + p_RoleName="dms-cloudwatch-logs-role", + p_Description="DMS CloudWatch IAM role", + p_ManagedPolicyArns=[ + cf.helpers.iam.AwsManagedPolicy.AmazonDMSCloudWatchLogsRole, + ], + ) + group.add(dms_vpc_role) + group.add(dms_cloudwatch_role) + + # Allow DMS accessing the data sink. In this case, Kinesis. + # For Redshift, this role needs to be called `dms-access-for-endpoint`. + dms_target_access_role = iam.Role( + id="DMSTargetAccessRole", + rp_AssumeRolePolicyDocument=trust_policy_dms, + p_RoleName=cf.Sub("${EnvName}-dms-target-access-role", {"EnvName": self.param_env_name.ref()}), + p_Description="DMS target access IAM role", + p_ManagedPolicyArns=[ + cf.helpers.iam.AwsManagedPolicy.AmazonKinesisFullAccess, + ], + ra_DependsOn=self._stream, + ) + group.add(dms_target_access_role) + + # Create a replication subnet group given a list of the subnet IDs in a VPC. + # https://docs.aws.amazon.com/dms/latest/APIReference/API_CreateReplicationSubnetGroup.html + # """ + dms_replication_subnet_group = dms.ReplicationSubnetGroup( # type: ignore[call-arg,misc] + "DMSReplicationSubnetGroup", + rp_SubnetIds=[self._public_subnet1.ref(), self._public_subnet2.ref()], + rp_ReplicationSubnetGroupDescription=f"DMS replication subnet group for {self.env_name}", + p_ReplicationSubnetGroupIdentifier=f"{self.env_name}-dms-subnet-group", + ra_DependsOn=[dms_vpc_role], + ) + group.add(dms_replication_subnet_group) + # """ + + dms_security_group = ec2.SecurityGroup( + "DMSSecurityGroup", + rp_GroupDescription=f"DMS security group for {self.env_name}", + p_GroupName=f"{self.env_name}-dms-security-group", + p_VpcId=self._vpc.ref(), + p_SecurityGroupIngress=[ + ec2.PropSecurityGroupIngress( + rp_IpProtocol="-1", + p_Description="Allow access from VPC", + p_FromPort=-1, + p_ToPort=-1, + p_CidrIp="10.0.0.0/24", + ), + # TODO: Possibly restrict to single provided ClientIP? + ec2.PropSecurityGroupIngress( + rp_IpProtocol="-1", + p_Description="Allow access from outside", + p_FromPort=-1, + p_ToPort=-1, + p_CidrIp="0.0.0.0/0", + ), + ], + p_SecurityGroupEgress=[ + ec2.PropSecurityGroupEgress( + rp_IpProtocol="-1", + p_Description="Allow any access out", + p_FromPort=-1, + p_ToPort=-1, + p_CidrIp="0.0.0.0/0", + ) + ], + ra_DependsOn=[self._vpc, dms_replication_subnet_group], + ) + group.add(dms_security_group) + + # Configuring VPC endpoints as AWS DMS source and target endpoints. + # https://docs.aws.amazon.com/dms/latest/userguide/CHAP_VPC_Endpoints.html + vpc_endpoint_stream = ec2.VPCEndpoint( + "KinesisVPCEndpoint", + rp_VpcId=self._vpc.ref(), + rp_ServiceName=f"com.amazonaws.{self.region}.kinesis-streams", + p_SubnetIds=[self._public_subnet1.ref(), self._public_subnet2.ref()], + p_SecurityGroupIds=[self._db_security_group.ref(), dms_security_group.ref()], + p_VpcEndpointType="Interface", + ) + group.add(vpc_endpoint_stream) + + source_endpoint = dms.Endpoint( # type: ignore[call-arg,misc] + "DMSSourceEndpoint", + rp_EndpointType="source", + rp_EngineName="postgres", + p_ServerName=self._db.rv_EndpointAddress, + # NOTE: Needs to be integer! + p_Port=self._db.rv_EndpointPort, + p_SslMode="require", + p_Username=self.db_username, + p_Password=self.db_password, + p_DatabaseName="postgres", + p_EndpointIdentifier=f"{self.env_name}-endpoint-source", + ra_DependsOn=[self._db], + ) + target_endpoint = dms.Endpoint( # type: ignore[call-arg,misc] + "DMSTargetEndpoint", + rp_EndpointType="target", + rp_EngineName="kinesis", + p_KinesisSettings=dms.PropEndpointKinesisSettings( + p_StreamArn=self._stream.rv_Arn, + p_MessageFormat="json-unformatted", + # The parameter ServiceAccessRoleArn must be provided and must not be blank. + p_ServiceAccessRoleArn=dms_target_access_role.rv_Arn, + ), + p_EndpointIdentifier=f"{self.env_name}-endpoint-target", + ra_DependsOn=[self._stream, dms_target_access_role, vpc_endpoint_stream], + ) + group.add(source_endpoint) + group.add(target_endpoint) + + # FIXME: Currently hard-coded to table `public.foo`. + map_to_kinesis = { + "rules": [ + { + "rule-type": "selection", + "rule-id": "1", + "rule-name": "DefaultInclude", + "rule-action": "include", + "object-locator": {"schema-name": "public", "table-name": "foo"}, + "filters": [], + }, + # Using the percent wildcard ("%") in "table-settings" rules is + # not supported for source databases as shown following. + # https://docs.aws.amazon.com/dms/latest/userguide/CHAP_Tasks.CustomizingTasks.TableMapping.SelectionTransformation.Tablesettings.html#CHAP_Tasks.CustomizingTasks.TableMapping.SelectionTransformation.Tablesettings.Wildcards + # Here: Exact schema and table required when using object mapping rule with '3.5' engine. + { + "rule-type": "object-mapping", + "rule-id": "2", + "rule-name": "DefaultMapToKinesis", + "rule-action": "map-record-to-record", + "object-locator": {"schema-name": "public", "table-name": "foo"}, + "filters": [], + }, + ] + } + + serverless_replication = dms.ReplicationConfig( # type: ignore[call-arg,misc] + "DMSReplicationConfig", + rp_ReplicationConfigIdentifier=f"{self.env_name}-dms-serverless", + # p_ResourceIdentifier=f"{self.env_name}-dms-serverless-resource", # noqa: ERA001 + rp_ReplicationType="full-load", + rp_SourceEndpointArn=source_endpoint.ref(), + rp_TargetEndpointArn=target_endpoint.ref(), + rp_ComputeConfig=dms.PropReplicationConfigComputeConfig( + rp_MaxCapacityUnits=1, + p_MinCapacityUnits=1, + p_MultiAZ=False, + p_ReplicationSubnetGroupId=dms_replication_subnet_group.ref(), + p_VpcSecurityGroupIds=[self._db_security_group.ref(), dms_security_group.ref()], + ), + rp_TableMappings=map_to_kinesis, + p_ReplicationSettings={ + # https://docs.aws.amazon.com/dms/latest/userguide/CHAP_Tasks.CustomizingTasks.TaskSettings.Logging.html + "Logging": { + "EnableLogging": True, + "EnableLogContext": True, + # ERROR: Feature is not accessible. + # TODO: "LogConfiguration": {"EnableTraceOnError": True}, + "LogComponents": [ + {"Id": "COMMON", "Severity": "LOGGER_SEVERITY_DETAILED_DEBUG"}, + {"Id": "ADDONS", "Severity": "LOGGER_SEVERITY_DETAILED_DEBUG"}, + {"Id": "DATA_STRUCTURE", "Severity": "LOGGER_SEVERITY_DETAILED_DEBUG"}, + {"Id": "COMMUNICATION", "Severity": "LOGGER_SEVERITY_DETAILED_DEBUG"}, + {"Id": "FILE_TRANSFER", "Severity": "LOGGER_SEVERITY_DETAILED_DEBUG"}, + {"Id": "FILE_FACTORY", "Severity": "LOGGER_SEVERITY_DETAILED_DEBUG"}, + {"Id": "METADATA_MANAGER", "Severity": "LOGGER_SEVERITY_DETAILED_DEBUG"}, + {"Id": "IO", "Severity": "LOGGER_SEVERITY_DETAILED_DEBUG"}, + {"Id": "PERFORMANCE", "Severity": "LOGGER_SEVERITY_DETAILED_DEBUG"}, + {"Id": "SORTER", "Severity": "LOGGER_SEVERITY_DETAILED_DEBUG"}, + {"Id": "SOURCE_CAPTURE", "Severity": "LOGGER_SEVERITY_DETAILED_DEBUG"}, + {"Id": "SOURCE_UNLOAD", "Severity": "LOGGER_SEVERITY_DETAILED_DEBUG"}, + {"Id": "TABLES_MANAGER", "Severity": "LOGGER_SEVERITY_DETAILED_DEBUG"}, + {"Id": "TARGET_APPLY", "Severity": "LOGGER_SEVERITY_DETAILED_DEBUG"}, + {"Id": "TARGET_LOAD", "Severity": "LOGGER_SEVERITY_DETAILED_DEBUG"}, + {"Id": "TASK_MANAGER", "Severity": "LOGGER_SEVERITY_DETAILED_DEBUG"}, + {"Id": "TRANSFORMATION", "Severity": "LOGGER_SEVERITY_DETAILED_DEBUG"}, + {"Id": "REST_SERVER", "Severity": "LOGGER_SEVERITY_DETAILED_DEBUG"}, + # Replication Settings document error: Unsupported keys were found: VALIDATOR + # {"Id": "VALIDATOR", "Severity": "LOGGER_SEVERITY_DETAILED_DEBUG"}, # noqa: ERA001 + {"Id": "VALIDATOR_EXT", "Severity": "LOGGER_SEVERITY_DETAILED_DEBUG"}, + ], + } + }, + ra_DependsOn=[ + dms_replication_subnet_group, + dms_security_group, + dms_vpc_role, + dms_cloudwatch_role, + dms_target_access_role, + source_endpoint, + target_endpoint, + ], + ) + group.add(serverless_replication) + + return self.add(group) + + @property + def stream_arn(self): + return self._stream.rv_Arn + + def processor(self, proc: LambdaFactory): + """ + Manifest the main processor component of this pipeline. + """ + self._processor = proc.make(self, environment=self.environment) + return self.add(self._processor.group) + + def connect(self): + """ + Connect the event source to the processor. + + https://docs.aws.amazon.com/lambda/latest/dg/services-kinesis-create.html + https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-lambda-eventsourcemapping.html#cfn-lambda-eventsourcemapping-startingposition + + aws kinesis register-stream-consumer \ + --consumer-name con1 \ + --stream-arn arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream + + aws lambda create-event-source-mapping \ + --function-name MyFunction \ + --event-source-arn arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream \ + --starting-position LATEST \ + --batch-size 100 + """ + if not self._processor: + raise RuntimeError("No processor defined") + if not self._event_source: + raise RuntimeError("No event source defined") + + # Get a handle to the AWS Lambda for dependency management purposes. + awsfunc = self._processor.function + + # Create a mapping and add it to the stack. + mapping = awslambda.EventSourceMapping( + id="EventSourceToLambdaMapping", + rp_FunctionName=awsfunc.p_FunctionName, + p_EventSourceArn=self._event_source.rv_Arn, + p_BatchSize=2500, + # LATEST - Read only new records. + # TRIM_HORIZON - Process all available records. + # AT_TIMESTAMP - Specify a time from which to start reading records. + p_StartingPosition="TRIM_HORIZON", + ra_DependsOn=awsfunc, + ) + return self.add(mapping) diff --git a/lorrystream/carabas/aws/stack.py b/lorrystream/carabas/aws/stack/dynamodb.py similarity index 67% rename from lorrystream/carabas/aws/stack.py rename to lorrystream/carabas/aws/stack/dynamodb.py index 5ad5e1a..cb76fc7 100644 --- a/lorrystream/carabas/aws/stack.py +++ b/lorrystream/carabas/aws/stack/dynamodb.py @@ -2,19 +2,18 @@ import typing as t import attr -import botocore from cottonformation import ResourceGroup from cottonformation.res import awslambda, dynamodb, kinesis from cottonformation.res.dynamodb import PropTableKinesisStreamSpecification -from lorrystream.carabas.aws.function.model import LambdaFactory, LambdaResource -from lorrystream.carabas.aws.model import GenericEnvStack +from lorrystream.carabas.aws.function.model import LambdaFactory +from lorrystream.carabas.aws.model import KinesisProcessorStack logger = logging.getLogger(__name__) @attr.s -class DynamoDBKinesisPipe(GenericEnvStack): +class DynamoDBKinesisPipe(KinesisProcessorStack): """ A description for an AWS CloudFormation stack, relaying DynamoDB CDC information into a sink. It is written down in Python, uses OO, and a fluent API. @@ -34,9 +33,6 @@ class DynamoDBKinesisPipe(GenericEnvStack): environment: t.Dict[str, str] = attr.ib(factory=dict) - _event_source: t.Optional[t.Union[kinesis.Stream]] = None - _processor: t.Optional[LambdaResource] = None - def table(self): """ aws dynamodb create-table \ @@ -143,52 +139,3 @@ def connect(self): ra_DependsOn=awsfunc, ) return self.add(mapping) - - def deploy_processor_image(self): - """ - Make an already running Lambda pick up a newly published OCI image. - - This is an imperative function executed orthogonally to the CloudFormation deployment. - - It follows this procedure: - - Acquire the `Arn` Output of the Stack's core processor Lambda. - - Use it to look up a handle to the actual Lambda information. - - From the information unit, extract the OCI image URI. - - Instruct the machinery to update the Lambda function code, - effectively respawning the container running it. - """ - if not self._processor: - logger.warning("No processor defined, skip deploying processor OCI image") - return None - function_id = self._processor.function.id - - # Inquire Stack Output. - logger.info(f"Discovering Lambda function existence: {function_id}") - output_id = f"{function_id}Arn" - try: - function_arn = self.get_output_value(self._bsm, output_id) - except botocore.exceptions.ClientError as ex: - if "does not exist" not in str(ex): - raise - logger.info(f"Stack not found or incomplete: {self.stack_name}") - return None - except KeyError: - logger.info(f"Stack not found or incomplete. Output not found: {output_id}") - return None - - # Inquire AWS API and eventually update Lambda code. - client = self._bsm.get_client("lambda") - try: - if func := client.get_function(FunctionName=function_arn): - logger.info(f"Found Lambda function: {function_arn}") - oci_uri = func["Code"]["ImageUri"] - logger.info(f"Deploying new OCI image to Lambda function: {oci_uri}") - response = client.update_function_code(FunctionName=function_arn, ImageUri=oci_uri) - last_status_message = response["LastUpdateStatusReason"] - logger.info(f"Lambda update status response: {last_status_message}") - except Exception as ex: - if ex.__class__.__name__ != "ResourceNotFoundException": - raise - logger.info(f"Lambda function to update OCI image not found: {function_arn}") - - return self