Skip to content

Commit a7f6695

Browse files
committed
Carabas: Subsystem to run pipeline elements on other people's machines
1 parent 17cb85b commit a7f6695

File tree

19 files changed

+1350
-1
lines changed

19 files changed

+1350
-1
lines changed

doc/pipe/aws/lambda.md

Lines changed: 207 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,207 @@
1+
# Pipelines with AWS Lambda
2+
3+
4+
## What's inside
5+
- A convenient [Infrastructure as code (IaC)] procedure to define data pipelines on [AWS].
6+
- Written in Python, using [AWS CloudFormation] stack deployments. To learn
7+
what's behind, see also [How CloudFormation works].
8+
- Code for running on [AWS Lambda] is packaged into [OCI] images, for efficient
9+
delta transfers, built-in versioning, and testing purposes.
10+
11+
12+
## Details
13+
- This specific document has a few general guidelines, and a
14+
a few specifics coming from `examples/aws/dynamodb_kinesis_lambda_oci_cratedb.py`.
15+
- That program defines a pipeline which looks like this:
16+
17+
DynamoDB CDC -> Kinesis Stream -> Python Lambda via OCI -> CrateDB
18+
19+
20+
## OCI image
21+
In order to package code for AWS Lambda functions packages into OCI images,
22+
and use them, you will need to publish them to the AWS ECR container image
23+
registry.
24+
25+
You will need to authenticate your local Docker environment, and create a
26+
container image repository once for each project using a different runtime
27+
image.
28+
29+
### Authenticate
30+
Define your AWS ID, region label, and repository name, to be able to use
31+
the templated commands 1:1.
32+
```shell
33+
aws_id=831394476016
34+
aws_region=eu-central-1
35+
repository_name=cratedb-kinesis-lambda
36+
```
37+
```shell
38+
aws ecr get-login-password --region=${aws_region} | \
39+
docker login --username AWS --password-stdin ${aws_id}.dkr.ecr.${aws_region}.amazonaws.com
40+
```
41+
42+
### ECR Repository
43+
Just once, before proceeding, create an image repository hosting the runtime
44+
code for your Lambda function.
45+
```shell
46+
aws ecr create-repository --region=${aws_region} \
47+
--repository-name=${repository_name} --image-tag-mutability=MUTABLE
48+
```
49+
In order to allow others to pull that image, you will need to define a
50+
[repository policy] using the [set-repository-policy] subcommend of the AWS CLI.
51+
In order to invoke that command, put the [](project:#ecr-repository-policy)
52+
JSON definition into a file called `policy.json`.
53+
```shell
54+
aws ecr set-repository-policy --repository-name=${repository_name} --policy-text file://policy.json
55+
```
56+
57+
### Troubleshooting
58+
If you receive such an error message, your session has expired, and you need
59+
to re-run the authentication step.
60+
```text
61+
denied: Your authorization token has expired. Reauthenticate and try again.
62+
```
63+
64+
65+
## CrateDB Table
66+
The destination table name in CrateDB, where the CDC record
67+
processor will re-materialize CDC events into.
68+
```shell
69+
pip install crash
70+
crash -c "CREATE TABLE transactions (data OBJECT(DYNAMIC));"
71+
```
72+
73+
74+
## Install
75+
In order to exercise the example outlined below, you need to install
76+
Lorrystream.
77+
```shell
78+
pip install 'lorrystream @ git+https://github.com/daq-tools/lorrystream.git@kinesis'
79+
```
80+
81+
82+
## Usage
83+
For exercising an AWS pipeline, you need two components: The IaC description,
84+
and a record processor implementation for the AWS Lambda. For example, choose
85+
those two variants:
86+
87+
- IaC driver: [dynamodb_kinesis_lambda_oci_cratedb.py]
88+
- Record processor: [kinesis_cratedb_lambda.py]
89+
90+
Putting them next to each other into a directory, and adjusting
91+
`LambdaPythonImage(entrypoint_file=...)` should be enough to get you started.
92+
Sure enough, you will also need to configure the `CRATEDB_SQLALCHEMY_URL`
93+
environment variable properly.
94+
95+
Then, just invoke the IaC program to spin up the defined infrastructure on AWS.
96+
97+
98+
## Operations
99+
There are a few utility commands that help you operate the stack, that have not
100+
been absorbed yet. See also [Monitoring and troubleshooting Lambda functions].
101+
102+
### Utilities
103+
Check status of Lambda function.
104+
```shell
105+
aws lambda get-function \
106+
--function-name arn:aws:lambda:eu-central-1:831394476016:function:testdrive-dynamodb-dev-lambda-processor
107+
```
108+
Check status of stream mapping(s).
109+
```shell
110+
aws lambda list-event-source-mappings
111+
```
112+
Check logs.
113+
```shell
114+
aws logs describe-log-groups
115+
aws logs start-live-tail --log-group-identifiers arn:aws:logs:eu-central-1:831394476016:log-group:/aws/lambda/DynamoDBCrateDBProcessor
116+
```
117+
118+
### Test Flight I
119+
Invoke the Lambda function for testing purposes.
120+
```shell
121+
aws lambda invoke \
122+
--function-name DynamoDBCrateDBProcessor \
123+
--payload file://records.json outputfile.txt
124+
```
125+
Pick `records.json` from [](project:#kinesis-example-event), it is a basic
126+
example of an AWS Kinesis event message.
127+
128+
:::{note}
129+
On AWS CLI v2, you may need that additional command line option.
130+
```shell
131+
--cli-binary-format raw-in-base64-out
132+
```
133+
:::
134+
135+
### Test Flight II
136+
Trigger a real event by running two DML operations on the source database table.
137+
```shell
138+
READING_SQL="{'timestamp': '2024-07-12T01:17:42', 'device': 'foo', 'temperature': 42.42, 'humidity': 84.84}"
139+
140+
aws dynamodb execute-statement --statement \
141+
"INSERT INTO \"table-testdrive\" VALUE ${READING_SQL};"
142+
143+
aws dynamodb execute-statement --statement \
144+
"UPDATE \"table-testdrive\" SET temperature=43.59 WHERE \"device\"='foo' AND \"timestamp\"='2024-07-12T01:17:42';"
145+
```
146+
147+
148+
## Appendix
149+
150+
(ecr-repository-policy)=
151+
### ECR Repository Policy
152+
```json
153+
{
154+
"Version": "2008-10-17",
155+
"Statement": [
156+
{
157+
"Sid": "allow public pull",
158+
"Effect": "Allow",
159+
"Principal": "*",
160+
"Action": [
161+
"ecr:BatchCheckLayerAvailability",
162+
"ecr:BatchGetImage",
163+
"ecr:GetDownloadUrlForLayer"
164+
]
165+
}
166+
]
167+
}
168+
```
169+
170+
(kinesis-example-event)=
171+
### Kinesis Example Event
172+
```json
173+
{
174+
"Records": [
175+
{
176+
"kinesis": {
177+
"kinesisSchemaVersion": "1.0",
178+
"partitionKey": "1",
179+
"sequenceNumber": "49590338271490256608559692538361571095921575989136588898",
180+
"data": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==",
181+
"approximateArrivalTimestamp": 1545084650.987
182+
},
183+
"eventSource": "aws:kinesis",
184+
"eventVersion": "1.0",
185+
"eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898",
186+
"eventName": "aws:kinesis:record",
187+
"invokeIdentityArn": "arn:aws:iam::111122223333:role/lambda-kinesis-role",
188+
"awsRegion": "us-east-2",
189+
"eventSourceARN": "arn:aws:kinesis:us-east-2:111122223333:stream/lambda-stream"
190+
}
191+
]
192+
}
193+
```
194+
195+
196+
[AWS]: https://en.wikipedia.org/wiki/Amazon_Web_Services
197+
[AWS CloudFormation]: https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/Welcome.html
198+
[AWS Lambda]: https://en.wikipedia.org/wiki/AWS_Lambda
199+
[dynamodb_kinesis_lambda_oci_cratedb.py]: https://github.com/daq-tools/lorrystream/blob/main/examples/aws/dynamodb_kinesis_lambda_oci_cratedb.py
200+
[example program]: https://github.com/daq-tools/lorrystream/tree/main/examples/aws
201+
[How CloudFormation works]: https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/cloudformation-overview.html
202+
[Infrastructure as code (IaC)]: https://en.wikipedia.org/wiki/Infrastructure_as_code
203+
[kinesis_cratedb_lambda.py]: https://github.com/daq-tools/lorrystream/blob/main/lorrystream/process/kinesis_cratedb_lambda.py
204+
[Monitoring and troubleshooting Lambda functions]: https://docs.aws.amazon.com/lambda/latest/dg/lambda-monitoring.html
205+
[OCI]: https://en.wikipedia.org/wiki/Open_Container_Initiative
206+
[repository policy]: https://docs.aws.amazon.com/lambda/latest/dg/images-create.html#gettingstarted-images-permissions
207+
[set-repository-policy]: https://docs.aws.amazon.com/cli/latest/reference/ecr/set-repository-policy.html
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
import logging
2+
from pathlib import Path
3+
4+
from lorrystream.carabas.aws import DynamoDBKinesisPipe, LambdaFactory, LambdaPythonImage
5+
from lorrystream.util.common import setup_logging
6+
7+
logger = logging.getLogger(__name__)
8+
9+
10+
def main():
11+
"""
12+
A recipe to deploy a data relay stack to Amazon AWS.
13+
14+
Pipeline:
15+
- DynamoDB CDC -> Kinesis Stream -> Python Lambda via OCI -> CrateDB
16+
17+
Ingredients:
18+
- DynamoDB CDC to Kinesis
19+
- Lambda function, shipped per OCI image
20+
- CrateDB Cloud
21+
22+
Prerequisites: Register an OCI repository.
23+
"""
24+
25+
# Build and publish OCI image that includes the AWS Lambda function.
26+
python_image = LambdaPythonImage(
27+
name="cratedb-kinesis-lambda",
28+
entrypoint_file=Path("./lorrystream/process/kinesis_cratedb_lambda.py"),
29+
entrypoint_handler="kinesis_cratedb_lambda.handler",
30+
)
31+
python_image.publish()
32+
33+
# Define an AWS CloudFormation software stack.
34+
stack = DynamoDBKinesisPipe(
35+
project="testdrive-dynamodb",
36+
stage="dev",
37+
region="eu-central-1",
38+
description="DynamoDB CDC -> Kinesis Stream -> Python Lambda via OCI -> CrateDB",
39+
table_name="table-testdrive",
40+
stream_name="dynamodb-cdc",
41+
environment={
42+
"CRATEDB_SQLALCHEMY_URL": "crate://admin:dZ..qB@example.eks1.eu-west-1.aws.cratedb.net:4200/?ssl=true",
43+
"CRATEDB_TABLE": "transactions",
44+
},
45+
)
46+
47+
# Add components to the stack.
48+
stack.table().processor(
49+
LambdaFactory(
50+
name="DynamoDBCrateDBProcessor",
51+
oci_uri=python_image.uri,
52+
handler=python_image.entrypoint_handler,
53+
)
54+
).connect()
55+
56+
# Deploy stack.
57+
stack.deploy()
58+
59+
logger.info(f"Deployed stack: {stack}")
60+
61+
62+
if __name__ == "__main__":
63+
setup_logging()
64+
main()

lorrystream/carabas/README.md

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
# Carabas
2+
3+
A subsystem to divert workloads to other people's computers.
4+
Workloads can be whole pipelines or elements of pipelines.
5+
Provides blended computing environments on your fingertips.
6+
7+
## Etymology
8+
- [Marquis von Carabas]
9+
- [Die Meisterkatze oder der gestiefelte Kater]
10+
- [Le Maître chat ou le Chat botté]
11+
- [Puss in Boots]
12+
13+
14+
[Die Meisterkatze oder der gestiefelte Kater]: https://de.frwiki.wiki/wiki/Le_Ma%C3%AEtre_chat_ou_le_Chat_bott%C3%A9
15+
[Le Maître chat ou le Chat botté]: https://fr.wikipedia.org/wiki/Le_Ma%C3%AEtre_chat_ou_le_Chat_bott%C3%A9
16+
[Marquis von Carabas]: https://de.frwiki.wiki/wiki/Marquis_de_Carabas
17+
[Puss in Boots]: https://en.wikipedia.org/wiki/Puss_in_Boots

lorrystream/carabas/__init__.py

Whitespace-only changes.

lorrystream/carabas/aws/__init__.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
from lorrystream.carabas.aws.function.model import LambdaFactory
2+
from lorrystream.carabas.aws.function.oci import LambdaPythonImage
3+
from lorrystream.carabas.aws.stack import DynamoDBKinesisPipe
4+
5+
__all__ = [
6+
"LambdaFactory",
7+
"LambdaPythonImage",
8+
"DynamoDBKinesisPipe",
9+
]

lorrystream/carabas/aws/function/__init__.py

Whitespace-only changes.

0 commit comments

Comments
 (0)