Skip to content

Commit

Permalink
Carabas: Add example DMS Serverless stack
Browse files Browse the repository at this point in the history
  • Loading branch information
amotl committed Jul 28, 2024
1 parent 0c34660 commit 12a7c3e
Showing 6 changed files with 758 additions and 60 deletions.
105 changes: 105 additions & 0 deletions examples/aws/rds_postgresql_kinesis_lambda_oci_cratedb.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
import logging

from lorrystream.carabas.aws import RDSPostgreSQLDMSKinesisPipe
from lorrystream.util.common import setup_logging

logger = logging.getLogger(__name__)


def main():
"""
A recipe to deploy a data migration stack to Amazon AWS.
Pipeline:
- RDS PostgreSQL -> DMS -> Kinesis Stream -> Python Lambda via OCI -> CrateDB
Ingredients:
- DMS, RDS PostgreSQL, Kinesis
- Lambda function, shipped per OCI image
- CrateDB Cloud
Prerequisites: Register an OCI repository.
"""

# Build and publish OCI image that includes the AWS Lambda function.
"""
python_image = LambdaPythonImage(
name="cratedb-kinesis-lambda",
entrypoint_file=Path("./lorrystream/process/kinesis_cratedb_lambda.py"),
entrypoint_handler="kinesis_cratedb_lambda.handler",
)
python_image.publish()
"""

# Define an AWS CloudFormation software stack.
stack = RDSPostgreSQLDMSKinesisPipe(
project="testdrive-dms-postgresql",
stage="dev",
region="eu-central-1",
description="RDS PostgreSQL > DMS -> Kinesis Stream -> Python Lambda via OCI -> CrateDB",
db_username="dynapipe",
db_password="secret11", # noqa: S106
environment={
"SINK_SQLALCHEMY_URL": "crate://admin:dZ..qB@example.eks1.eu-west-1.aws.cratedb.net:4200/?ssl=true",
"SINK_TABLE": "transactions",
},
)

# Add components to the stack.
"""
stack.table().processor(
LambdaFactory(
name="DynamoDBCrateDBProcessor",
oci_uri=python_image.uri,
handler=python_image.entrypoint_handler,
)
).connect()
"""
stack.vpc().database().stream().dms() # .table()

# Deploy stack.
stack.deploy()
logger.info(f"Deployed stack: {stack}")

# Refresh the OCI image.
# TODO: Detect when changed.
stack.deploy_processor_image()

PublicDbEndpoint = stack.get_output_value(stack._bsm, "PublicDbEndpoint")
PublicDbPort = stack.get_output_value(stack._bsm, "PublicDbPort")
psql_command = (
f'psql "postgresql://{stack.db_username}:{stack.db_password}@{PublicDbEndpoint}:{PublicDbPort}/postgres"'
)
print(psql_command)

print("Stream ARN:", stack.get_output_value(stack._bsm, "StreamArn"))

"""
aws dms describe-replications
aws dms start-replication \
--start-replication-type=start-replication \
--replication-config-arn arn:aws:dms:eu-central-1:931394475905:replication-config:LB2JAGY7XFB7PA7HEX3MI36CUA
aws logs describe-log-groups
aws logs start-live-tail --log-group-identifiers \
arn:aws:logs:eu-central-1:931394475905:log-group:/aws/rds/instance/testdrive-dms-postgresql-dev-db/postgresql \
arn:aws:logs:eu-central-1:931394475905:log-group:dms-serverless-replication-LB2JAGY7XFB7PA7HEX3MI36CUA
aws cloudformation continue-update-rollback --stack-name testdrive-dms-postgresql-dev
aws cloudformation delete-stack --stack-name testdrive-dms-postgresql-dev
"""
"""
- https://docs.aws.amazon.com/dms/latest/APIReference/API_StartReplication.html#DMS-StartReplication-request-StartReplicationType
- https://docs.aws.amazon.com/cli/latest/reference/dms/start-replication-task.html
Possible values:
- start-replication
- resume-processing
- reload-target
"""


if __name__ == "__main__":
setup_logging()
main()
6 changes: 4 additions & 2 deletions lorrystream/carabas/aws/__init__.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
from lorrystream.carabas.aws.function.model import LambdaFactory
from lorrystream.carabas.aws.function.oci import LambdaPythonImage
from lorrystream.carabas.aws.stack import DynamoDBKinesisPipe
from lorrystream.carabas.aws.stack.dms import RDSPostgreSQLDMSKinesisPipe
from lorrystream.carabas.aws.stack.dynamodb import DynamoDBKinesisPipe

__all__ = [
"DynamoDBKinesisPipe",
"LambdaFactory",
"LambdaPythonImage",
"DynamoDBKinesisPipe",
"RDSPostgreSQLDMSKinesisPipe",
]
74 changes: 72 additions & 2 deletions lorrystream/carabas/aws/model.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,15 @@
import logging
import typing as t

import attr
import botocore
import cottonformation as cf
from aws_cloudformation import Parameter
from boto_session_manager import BotoSesManager
from cottonformation.res import kinesis

if t.TYPE_CHECKING:
from lorrystream.carabas.aws.function.model import LambdaResource

logger = logging.getLogger(__name__)

@@ -27,11 +33,12 @@ def post_hook(self):
self.template.Description = self.description
self.define_parameters()

def add(self, thing):
def add(self, *things):
"""
A shortcut function to add a component to the current template of this Stack.
"""
self.template.add(thing)
for thing in things:
self.template.add(thing)
return self

@property
@@ -87,5 +94,68 @@ def deploy(self, respawn: bool = False):
include_named_iam=True,
verbose=True,
skip_prompt=True,
# 300 seconds are not enough to wait for RDS PostgreSQL, for example.
timeout=500,
)
return self


@attr.s
class GenericProcessorStack(GenericEnvStack):

_processor: t.Optional["LambdaResource"] = None

def deploy_processor_image(self):
"""
Make an already running Lambda pick up a newly published OCI image.
This is an imperative function executed orthogonally to the CloudFormation deployment.
It follows this procedure:
- Acquire the `<FunctionName>Arn` Output of the Stack's core processor Lambda.
- Use it to look up a handle to the actual Lambda information.
- From the information unit, extract the OCI image URI.
- Instruct the machinery to update the Lambda function code,
effectively respawning the container running it.
"""
if not self._processor:
logger.warning("No processor defined, skip deploying processor OCI image")
return None
function_id = self._processor.function.id

# Inquire Stack Output.
logger.info(f"Discovering Lambda function existence: {function_id}")
output_id = f"{function_id}Arn"
try:
function_arn = self.get_output_value(self._bsm, output_id)
except botocore.exceptions.ClientError as ex:
if "does not exist" not in str(ex):
raise
logger.info(f"Stack not found or incomplete: {self.stack_name}")
return None
except KeyError:
logger.info(f"Stack not found or incomplete. Output not found: {output_id}")
return None

# Inquire AWS API and eventually update Lambda code.
client = self._bsm.get_client("lambda")
try:
if func := client.get_function(FunctionName=function_arn):
logger.info(f"Found Lambda function: {function_arn}")
oci_uri = func["Code"]["ImageUri"]
logger.info(f"Deploying new OCI image to Lambda function: {oci_uri}")
response = client.update_function_code(FunctionName=function_arn, ImageUri=oci_uri)
last_status_message = response["LastUpdateStatusReason"]
logger.info(f"Lambda update status response: {last_status_message}")
except Exception as ex:
if ex.__class__.__name__ != "ResourceNotFoundException":
raise
logger.info(f"Lambda function to update OCI image not found: {function_arn}")

return self


@attr.s
class KinesisProcessorStack(GenericProcessorStack):

_event_source: t.Optional[t.Union[kinesis.Stream]] = None
Empty file.
Loading

0 comments on commit 12a7c3e

Please sign in to comment.