Skip to content

Commit 55cf456

Browse files
committed
Carabas: Subsystem to run pipeline elements on other people's machines
1 parent 17cb85b commit 55cf456

File tree

19 files changed

+1360
-1
lines changed

19 files changed

+1360
-1
lines changed

doc/pipe/aws/lambda.md

Lines changed: 215 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,215 @@
1+
# Pipelines with AWS Lambda
2+
3+
4+
## What's inside
5+
- A convenient [Infrastructure as code (IaC)] procedure to define data pipelines on [AWS].
6+
- Written in Python, using [AWS CloudFormation] stack deployments. To learn
7+
what's behind, see also [How CloudFormation works].
8+
- Code for running on [AWS Lambda] is packaged into [OCI] images, for efficient
9+
delta transfers, built-in versioning, and testing purposes.
10+
11+
12+
## Details
13+
- This specific document has a few general guidelines, and a
14+
a few specifics coming from `examples/aws/dynamodb_kinesis_lambda_oci_cratedb.py`.
15+
- That program defines a pipeline which looks like this:
16+
17+
DynamoDB CDC -> Kinesis Stream -> Python Lambda via OCI -> CrateDB
18+
19+
20+
## OCI image
21+
In order to package code for AWS Lambda functions packages into OCI images,
22+
and use them, you will need to publish them to the AWS ECR container image
23+
registry.
24+
25+
You will need to authenticate your local Docker environment, and create a
26+
container image repository once for each project using a different runtime
27+
image.
28+
29+
### Authenticate
30+
Define your AWS ID, region label, and repository name, to be able to use
31+
the templated commands 1:1.
32+
```shell
33+
aws_id=831394476016
34+
aws_region=eu-central-1
35+
repository_name=cratedb-kinesis-lambda
36+
```
37+
```shell
38+
aws ecr get-login-password --region=${aws_region} | \
39+
docker login --username AWS --password-stdin ${aws_id}.dkr.ecr.${aws_region}.amazonaws.com
40+
```
41+
42+
(ecr-repository)=
43+
### ECR Repository
44+
Just once, before proceeding, create an image repository hosting the runtime
45+
code for your Lambda function.
46+
```shell
47+
aws ecr create-repository --region=${aws_region} \
48+
--repository-name=${repository_name} --image-tag-mutability=MUTABLE
49+
```
50+
In order to allow others to pull that image, you will need to define a
51+
[repository policy] using the [set-repository-policy] subcommend of the AWS CLI.
52+
In order to invoke that command, put the [](project:#ecr-repository-policy)
53+
JSON definition into a file called `policy.json`.
54+
```shell
55+
aws ecr set-repository-policy --repository-name=${repository_name} --policy-text file://policy.json
56+
```
57+
58+
### Troubleshooting
59+
If you receive such an error message, your session has expired, and you need
60+
to re-run the authentication step.
61+
```text
62+
denied: Your authorization token has expired. Reauthenticate and try again.
63+
```
64+
65+
This error message indicates your ECR repository does not exist. The solution
66+
is to create it, using the command shared above.
67+
```text
68+
name unknown: The repository with name 'cratedb-kinesis-lambda' does
69+
not exist in the registry with id '831394476016'
70+
```
71+
72+
73+
## CrateDB Table
74+
The destination table name in CrateDB, where the CDC record
75+
processor will re-materialize CDC events into.
76+
```shell
77+
pip install crash
78+
crash -c "CREATE TABLE transactions (data OBJECT(DYNAMIC));"
79+
```
80+
81+
82+
## Install
83+
In order to exercise the example outlined below, you need to install
84+
Lorrystream.
85+
```shell
86+
pip install 'lorrystream @ git+https://github.com/daq-tools/lorrystream.git@kinesis'
87+
```
88+
89+
90+
## Usage
91+
For exercising an AWS pipeline, you need two components: The IaC description,
92+
and a record processor implementation for the AWS Lambda. For example, choose
93+
those two variants:
94+
95+
- IaC driver: [dynamodb_kinesis_lambda_oci_cratedb.py]
96+
- Record processor: [kinesis_cratedb_lambda.py]
97+
98+
Putting them next to each other into a directory, and adjusting
99+
`LambdaPythonImage(entrypoint_file=...)` should be enough to get you started.
100+
Sure enough, you will also need to configure the `CRATEDB_SQLALCHEMY_URL`
101+
environment variable properly.
102+
103+
Then, just invoke the IaC program to spin up the defined infrastructure on AWS.
104+
105+
106+
## Operations
107+
There are a few utility commands that help you operate the stack, that have not
108+
been absorbed yet. See also [Monitoring and troubleshooting Lambda functions].
109+
110+
### Utilities
111+
Check status of Lambda function.
112+
```shell
113+
aws lambda get-function \
114+
--function-name arn:aws:lambda:eu-central-1:831394476016:function:testdrive-dynamodb-dev-lambda-processor
115+
```
116+
Check status of stream mapping(s).
117+
```shell
118+
aws lambda list-event-source-mappings
119+
```
120+
Check logs.
121+
```shell
122+
aws logs describe-log-groups
123+
aws logs start-live-tail --log-group-identifiers arn:aws:logs:eu-central-1:831394476016:log-group:/aws/lambda/DynamoDBCrateDBProcessor
124+
```
125+
126+
### Test Flight I
127+
Invoke the Lambda function for testing purposes.
128+
```shell
129+
aws lambda invoke \
130+
--function-name DynamoDBCrateDBProcessor \
131+
--payload file://records.json outputfile.txt
132+
```
133+
Pick `records.json` from [](project:#kinesis-example-event), it is a basic
134+
example of an AWS Kinesis event message.
135+
136+
:::{note}
137+
On AWS CLI v2, you may need that additional command line option.
138+
```shell
139+
--cli-binary-format raw-in-base64-out
140+
```
141+
:::
142+
143+
### Test Flight II
144+
Trigger a real event by running two DML operations on the source database table.
145+
```shell
146+
READING_SQL="{'timestamp': '2024-07-12T01:17:42', 'device': 'foo', 'temperature': 42.42, 'humidity': 84.84}"
147+
148+
aws dynamodb execute-statement --statement \
149+
"INSERT INTO \"table-testdrive\" VALUE ${READING_SQL};"
150+
151+
aws dynamodb execute-statement --statement \
152+
"UPDATE \"table-testdrive\" SET temperature=43.59 WHERE \"device\"='foo' AND \"timestamp\"='2024-07-12T01:17:42';"
153+
```
154+
155+
156+
## Appendix
157+
158+
(ecr-repository-policy)=
159+
### ECR Repository Policy
160+
```json
161+
{
162+
"Version": "2008-10-17",
163+
"Statement": [
164+
{
165+
"Sid": "allow public pull",
166+
"Effect": "Allow",
167+
"Principal": "*",
168+
"Action": [
169+
"ecr:BatchCheckLayerAvailability",
170+
"ecr:BatchGetImage",
171+
"ecr:GetDownloadUrlForLayer"
172+
]
173+
}
174+
]
175+
}
176+
```
177+
178+
(kinesis-example-event)=
179+
### Kinesis Example Event
180+
```json
181+
{
182+
"Records": [
183+
{
184+
"kinesis": {
185+
"kinesisSchemaVersion": "1.0",
186+
"partitionKey": "1",
187+
"sequenceNumber": "49590338271490256608559692538361571095921575989136588898",
188+
"data": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==",
189+
"approximateArrivalTimestamp": 1545084650.987
190+
},
191+
"eventSource": "aws:kinesis",
192+
"eventVersion": "1.0",
193+
"eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898",
194+
"eventName": "aws:kinesis:record",
195+
"invokeIdentityArn": "arn:aws:iam::111122223333:role/lambda-kinesis-role",
196+
"awsRegion": "us-east-2",
197+
"eventSourceARN": "arn:aws:kinesis:us-east-2:111122223333:stream/lambda-stream"
198+
}
199+
]
200+
}
201+
```
202+
203+
204+
[AWS]: https://en.wikipedia.org/wiki/Amazon_Web_Services
205+
[AWS CloudFormation]: https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/Welcome.html
206+
[AWS Lambda]: https://en.wikipedia.org/wiki/AWS_Lambda
207+
[dynamodb_kinesis_lambda_oci_cratedb.py]: https://github.com/daq-tools/lorrystream/blob/main/examples/aws/dynamodb_kinesis_lambda_oci_cratedb.py
208+
[example program]: https://github.com/daq-tools/lorrystream/tree/main/examples/aws
209+
[How CloudFormation works]: https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/cloudformation-overview.html
210+
[Infrastructure as code (IaC)]: https://en.wikipedia.org/wiki/Infrastructure_as_code
211+
[kinesis_cratedb_lambda.py]: https://github.com/daq-tools/lorrystream/blob/main/lorrystream/process/kinesis_cratedb_lambda.py
212+
[Monitoring and troubleshooting Lambda functions]: https://docs.aws.amazon.com/lambda/latest/dg/lambda-monitoring.html
213+
[OCI]: https://en.wikipedia.org/wiki/Open_Container_Initiative
214+
[repository policy]: https://docs.aws.amazon.com/lambda/latest/dg/images-create.html#gettingstarted-images-permissions
215+
[set-repository-policy]: https://docs.aws.amazon.com/cli/latest/reference/ecr/set-repository-policy.html
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
import logging
2+
from pathlib import Path
3+
4+
from lorrystream.carabas.aws import DynamoDBKinesisPipe, LambdaFactory, LambdaPythonImage
5+
from lorrystream.util.common import setup_logging
6+
7+
logger = logging.getLogger(__name__)
8+
9+
10+
def main():
11+
"""
12+
A recipe to deploy a data relay stack to Amazon AWS.
13+
14+
Pipeline:
15+
- DynamoDB CDC -> Kinesis Stream -> Python Lambda via OCI -> CrateDB
16+
17+
Ingredients:
18+
- DynamoDB CDC to Kinesis
19+
- Lambda function, shipped per OCI image
20+
- CrateDB Cloud
21+
22+
Prerequisites: Register an OCI repository.
23+
"""
24+
25+
# Build and publish OCI image that includes the AWS Lambda function.
26+
python_image = LambdaPythonImage(
27+
name="cratedb-kinesis-lambda",
28+
entrypoint_file=Path("./lorrystream/process/kinesis_cratedb_lambda.py"),
29+
entrypoint_handler="kinesis_cratedb_lambda.handler",
30+
)
31+
python_image.publish()
32+
33+
# Define an AWS CloudFormation software stack.
34+
stack = DynamoDBKinesisPipe(
35+
project="testdrive-dynamodb",
36+
stage="dev",
37+
region="eu-central-1",
38+
description="DynamoDB CDC -> Kinesis Stream -> Python Lambda via OCI -> CrateDB",
39+
table_name="table-testdrive",
40+
stream_name="dynamodb-cdc",
41+
environment={
42+
"CRATEDB_SQLALCHEMY_URL": "crate://admin:dZ..qB@example.eks1.eu-west-1.aws.cratedb.net:4200/?ssl=true",
43+
"CRATEDB_TABLE": "transactions",
44+
},
45+
)
46+
47+
# Add components to the stack.
48+
stack.table().processor(
49+
LambdaFactory(
50+
name="DynamoDBCrateDBProcessor",
51+
oci_uri=python_image.uri,
52+
handler=python_image.entrypoint_handler,
53+
)
54+
).connect()
55+
56+
# Deploy stack.
57+
stack.deploy()
58+
logger.info(f"Deployed stack: {stack}")
59+
60+
# Refresh the OCI image.
61+
# TODO: Detect when changed.
62+
stack.deploy_processor_image()
63+
64+
65+
if __name__ == "__main__":
66+
setup_logging()
67+
main()

lorrystream/carabas/README.md

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
# Carabas
2+
3+
A subsystem to divert workloads to other people's computers.
4+
Workloads can be whole pipelines or elements of pipelines.
5+
Provides blended computing environments on your fingertips.
6+
7+
## Etymology
8+
- [Marquis von Carabas]
9+
- [Die Meisterkatze oder der gestiefelte Kater]
10+
- [Le Maître chat ou le Chat botté]
11+
- [Puss in Boots]
12+
13+
14+
[Die Meisterkatze oder der gestiefelte Kater]: https://de.frwiki.wiki/wiki/Le_Ma%C3%AEtre_chat_ou_le_Chat_bott%C3%A9
15+
[Le Maître chat ou le Chat botté]: https://fr.wikipedia.org/wiki/Le_Ma%C3%AEtre_chat_ou_le_Chat_bott%C3%A9
16+
[Marquis von Carabas]: https://de.frwiki.wiki/wiki/Marquis_de_Carabas
17+
[Puss in Boots]: https://en.wikipedia.org/wiki/Puss_in_Boots

lorrystream/carabas/__init__.py

Whitespace-only changes.

lorrystream/carabas/aws/__init__.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
from lorrystream.carabas.aws.function.model import LambdaFactory
2+
from lorrystream.carabas.aws.function.oci import LambdaPythonImage
3+
from lorrystream.carabas.aws.stack import DynamoDBKinesisPipe
4+
5+
__all__ = [
6+
"LambdaFactory",
7+
"LambdaPythonImage",
8+
"DynamoDBKinesisPipe",
9+
]

lorrystream/carabas/aws/function/__init__.py

Whitespace-only changes.

0 commit comments

Comments
 (0)