Skip to content

Commit 3ffae15

Browse files
committed
Carabas: Add example DMS Serverless stack
1 parent 0c34660 commit 3ffae15

File tree

6 files changed

+758
-60
lines changed

6 files changed

+758
-60
lines changed
Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
import logging
2+
3+
from lorrystream.carabas.aws import RDSPostgreSQLDMSKinesisPipe
4+
from lorrystream.util.common import setup_logging
5+
6+
logger = logging.getLogger(__name__)
7+
8+
9+
def main():
10+
"""
11+
A recipe to deploy a data relay stack to Amazon AWS.
12+
13+
Pipeline:
14+
- RDS PostgreSQL -> DMS -> Kinesis Stream -> Python Lambda via OCI -> CrateDB
15+
16+
Ingredients:
17+
- DMS, RDS PostgreSQL, Kinesis
18+
- Lambda function, shipped per OCI image
19+
- CrateDB Cloud
20+
21+
Prerequisites: Register an OCI repository.
22+
"""
23+
24+
# Build and publish OCI image that includes the AWS Lambda function.
25+
"""
26+
python_image = LambdaPythonImage(
27+
name="cratedb-kinesis-lambda",
28+
entrypoint_file=Path("./lorrystream/process/kinesis_cratedb_lambda.py"),
29+
entrypoint_handler="kinesis_cratedb_lambda.handler",
30+
)
31+
python_image.publish()
32+
"""
33+
34+
# Define an AWS CloudFormation software stack.
35+
stack = RDSPostgreSQLDMSKinesisPipe(
36+
project="testdrive-dms-postgresql",
37+
stage="dev",
38+
region="eu-central-1",
39+
description="RDS PostgreSQL > DMS -> Kinesis Stream -> Python Lambda via OCI -> CrateDB",
40+
db_username="dynapipe",
41+
db_password="secret11", # noqa: S106
42+
environment={
43+
"SINK_SQLALCHEMY_URL": "crate://admin:dZ..qB@example.eks1.eu-west-1.aws.cratedb.net:4200/?ssl=true",
44+
"SINK_TABLE": "transactions",
45+
},
46+
)
47+
48+
# Add components to the stack.
49+
"""
50+
stack.table().processor(
51+
LambdaFactory(
52+
name="DynamoDBCrateDBProcessor",
53+
oci_uri=python_image.uri,
54+
handler=python_image.entrypoint_handler,
55+
)
56+
).connect()
57+
"""
58+
stack.vpc().database().stream().dms() # .table()
59+
60+
# Deploy stack.
61+
stack.deploy()
62+
logger.info(f"Deployed stack: {stack}")
63+
64+
# Refresh the OCI image.
65+
# TODO: Detect when changed.
66+
stack.deploy_processor_image()
67+
68+
PublicDbEndpoint = stack.get_output_value(stack._bsm, "PublicDbEndpoint")
69+
PublicDbPort = stack.get_output_value(stack._bsm, "PublicDbPort")
70+
psql_command = (
71+
f'psql "postgresql://{stack.db_username}:{stack.db_password}@{PublicDbEndpoint}:{PublicDbPort}/postgres"'
72+
)
73+
print(psql_command)
74+
75+
print("Stream ARN:", stack.get_output_value(stack._bsm, "StreamArn"))
76+
77+
"""
78+
aws dms describe-replications
79+
aws dms start-replication \
80+
--start-replication-type=start-replication \
81+
--replication-config-arn arn:aws:dms:eu-central-1:931394475905:replication-config:LB2JAGY7XFB7PA7HEX3MI36CUA
82+
83+
aws logs describe-log-groups
84+
aws logs start-live-tail --log-group-identifiers \
85+
arn:aws:logs:eu-central-1:931394475905:log-group:/aws/rds/instance/testdrive-dms-postgresql-dev-db/postgresql \
86+
arn:aws:logs:eu-central-1:931394475905:log-group:dms-serverless-replication-LB2JAGY7XFB7PA7HEX3MI36CUA
87+
88+
aws cloudformation continue-update-rollback --stack-name testdrive-dms-postgresql-dev
89+
aws cloudformation delete-stack --stack-name testdrive-dms-postgresql-dev
90+
"""
91+
"""
92+
- https://docs.aws.amazon.com/dms/latest/APIReference/API_StartReplication.html#DMS-StartReplication-request-StartReplicationType
93+
- https://docs.aws.amazon.com/cli/latest/reference/dms/start-replication-task.html
94+
95+
Possible values:
96+
97+
- start-replication
98+
- resume-processing
99+
- reload-target
100+
"""
101+
102+
103+
if __name__ == "__main__":
104+
setup_logging()
105+
main()

lorrystream/carabas/aws/__init__.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
from lorrystream.carabas.aws.function.model import LambdaFactory
22
from lorrystream.carabas.aws.function.oci import LambdaPythonImage
3-
from lorrystream.carabas.aws.stack import DynamoDBKinesisPipe
3+
from lorrystream.carabas.aws.stack.dms import RDSPostgreSQLDMSKinesisPipe
4+
from lorrystream.carabas.aws.stack.dynamodb import DynamoDBKinesisPipe
45

56
__all__ = [
7+
"DynamoDBKinesisPipe",
68
"LambdaFactory",
79
"LambdaPythonImage",
8-
"DynamoDBKinesisPipe",
10+
"RDSPostgreSQLDMSKinesisPipe",
911
]

lorrystream/carabas/aws/model.py

Lines changed: 72 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,15 @@
11
import logging
2+
import typing as t
23

34
import attr
5+
import botocore
46
import cottonformation as cf
57
from aws_cloudformation import Parameter
68
from boto_session_manager import BotoSesManager
9+
from cottonformation.res import kinesis
10+
11+
if t.TYPE_CHECKING:
12+
from lorrystream.carabas.aws.function.model import LambdaResource
713

814
logger = logging.getLogger(__name__)
915

@@ -27,11 +33,12 @@ def post_hook(self):
2733
self.template.Description = self.description
2834
self.define_parameters()
2935

30-
def add(self, thing):
36+
def add(self, *things):
3137
"""
3238
A shortcut function to add a component to the current template of this Stack.
3339
"""
34-
self.template.add(thing)
40+
for thing in things:
41+
self.template.add(thing)
3542
return self
3643

3744
@property
@@ -87,5 +94,68 @@ def deploy(self, respawn: bool = False):
8794
include_named_iam=True,
8895
verbose=True,
8996
skip_prompt=True,
97+
# 300 seconds are not enough to wait for RDS PostgreSQL, for example.
98+
timeout=500,
9099
)
91100
return self
101+
102+
103+
@attr.s
104+
class GenericProcessorStack(GenericEnvStack):
105+
106+
_processor: t.Optional["LambdaResource"] = None
107+
108+
def deploy_processor_image(self):
109+
"""
110+
Make an already running Lambda pick up a newly published OCI image.
111+
112+
This is an imperative function executed orthogonally to the CloudFormation deployment.
113+
114+
It follows this procedure:
115+
- Acquire the `<FunctionName>Arn` Output of the Stack's core processor Lambda.
116+
- Use it to look up a handle to the actual Lambda information.
117+
- From the information unit, extract the OCI image URI.
118+
- Instruct the machinery to update the Lambda function code,
119+
effectively respawning the container running it.
120+
"""
121+
if not self._processor:
122+
logger.warning("No processor defined, skip deploying processor OCI image")
123+
return None
124+
function_id = self._processor.function.id
125+
126+
# Inquire Stack Output.
127+
logger.info(f"Discovering Lambda function existence: {function_id}")
128+
output_id = f"{function_id}Arn"
129+
try:
130+
function_arn = self.get_output_value(self._bsm, output_id)
131+
except botocore.exceptions.ClientError as ex:
132+
if "does not exist" not in str(ex):
133+
raise
134+
logger.info(f"Stack not found or incomplete: {self.stack_name}")
135+
return None
136+
except KeyError:
137+
logger.info(f"Stack not found or incomplete. Output not found: {output_id}")
138+
return None
139+
140+
# Inquire AWS API and eventually update Lambda code.
141+
client = self._bsm.get_client("lambda")
142+
try:
143+
if func := client.get_function(FunctionName=function_arn):
144+
logger.info(f"Found Lambda function: {function_arn}")
145+
oci_uri = func["Code"]["ImageUri"]
146+
logger.info(f"Deploying new OCI image to Lambda function: {oci_uri}")
147+
response = client.update_function_code(FunctionName=function_arn, ImageUri=oci_uri)
148+
last_status_message = response["LastUpdateStatusReason"]
149+
logger.info(f"Lambda update status response: {last_status_message}")
150+
except Exception as ex:
151+
if ex.__class__.__name__ != "ResourceNotFoundException":
152+
raise
153+
logger.info(f"Lambda function to update OCI image not found: {function_arn}")
154+
155+
return self
156+
157+
158+
@attr.s
159+
class KinesisProcessorStack(GenericProcessorStack):
160+
161+
_event_source: t.Optional[t.Union[kinesis.Stream]] = None

lorrystream/carabas/aws/stack/__init__.py

Whitespace-only changes.

0 commit comments

Comments
 (0)