Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Carabas: Kinesis, DynamoDB, DMS - CDC record processing in different flavours #120

Merged
merged 28 commits into from
Aug 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
d3079c8
Kinesis: Basic record processor application skeleton
amotl Jul 11, 2024
e99e912
DynamoDB: Capture change stream, using Kinesis on AWS Cloud
amotl Jul 12, 2024
f9b5679
DynamoDB: Decode CDC event records
amotl Jul 12, 2024
908f50a
DynamoDB: "DynamoDB Streams Kinesis Adapter" project is dead
amotl Jul 12, 2024
155d172
DynamoDB: Get CDC event to SQL translator right, improve KCLv2 launcher
amotl Jul 13, 2024
afd7997
Carabas: Subsystem to run pipeline elements on other people's machines
amotl Jul 23, 2024
ff87590
Carabas: Consolidate documentation
amotl Jul 25, 2024
475cd7c
Kinesis/DynamoDB: Refactor KCLv2 implementation to `lorrystream.spike`
amotl Jul 25, 2024
a328ef9
Project: Provide `__appname__` and `__version__` symbols
amotl Jul 25, 2024
15f0f72
Kinesis/DynamoDB: Improve Lambda
amotl Jul 25, 2024
496523f
Kinesis/DynamoDB: Configure Lambda
amotl Jul 25, 2024
54ad149
Kinesis: Refactor basic publish/subscribe programs using async-kinesis
amotl Jul 27, 2024
f95ac65
Carabas: Add updated cottonformation driver for AWS DMS
amotl Jul 27, 2024
5826f37
Carabas/DMS: Add example DMS Serverless stack
amotl Jul 28, 2024
a57d794
Carabas/DMS: Improve CloudFormation stack
amotl Jul 29, 2024
9a165fd
Dependencies: Nail a few dependencies related to software tests
amotl Jul 29, 2024
92dbdac
Tests: Fix timing of software tests
amotl Jul 29, 2024
a5ef1d2
CI: Speed up testing by not tearing down test containers
amotl Jul 29, 2024
717fded
Carabas/DMS: Make it work
amotl Aug 6, 2024
6583e23
Carabas/DMS: Improve configuration. Add software tests.
amotl Aug 6, 2024
5d3ea8c
Carabas/DMS: Improve documentation
amotl Aug 7, 2024
5239131
Chore: Naming things. Rebase aftermath fixes. Run linter.
amotl Aug 7, 2024
4022f69
Carabas/LocalStack: Improve standalone Kinesis usage
amotl Aug 8, 2024
11349e9
Carabas/Lambda/DMS: Add software tests
amotl Aug 8, 2024
6d8cec1
Carabas: Fix documentation
amotl Aug 8, 2024
fcee258
Carabas: Fix CI
amotl Aug 8, 2024
8bf1869
Carabas: Fix CI
Aug 16, 2024
6856a3c
Carabas: Update documentation
Aug 16, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ jobs:
env:
OS: ${{ matrix.os }}
PYTHON: ${{ matrix.python-version }}
TC_KEEPALIVE: true

name: Python ${{ matrix.python-version }} on OS ${{ matrix.os }}
steps:
Expand All @@ -56,7 +57,7 @@ jobs:
pip install "setuptools>=64" --upgrade

# Install package in editable mode.
pip install --use-pep517 --prefer-binary --editable=.[test,develop]
pip install --use-pep517 --prefer-binary --editable=.[all,test,develop]

- name: Run linter and software tests
run: |
Expand Down
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Changelog

## in progress
- Carabas: A subsystem to divert workloads to other people’s computers

## 2024-07-10 v0.0.2
- Initial working version, supporting MQTT, AMQP, and SQLAlchemy/CrateDB
Expand Down
2 changes: 2 additions & 0 deletions doc/backlog.rst
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ Iteration 2
- [o] Examples: Add ``appsink`` example
- [o] Improve inline docs
- [o] Release 0.1.0
- [o] CSV: https://github.com/alan-turing-institute/CleverCSV
- [o] Excel & ODF: https://github.com/dimastbk/python-calamine


***********
Expand Down
21 changes: 21 additions & 0 deletions doc/carabas/backlog.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# Carabas Backlog

## Iteration +1
- [x] Improve type mapping
- [x] Generalize CDC event -> SQL translator
- [ ] Only optionally display debug output of Docker build process,
[ ] when using `--verbose`.
- [ ] Bring back "Zip" use, for interactive hacking
- [ ] Distill into a Lambda variant
- [ ] Automation!
- [ ] DDL: CREATE TABLE <tablename> (data OBJECT(DYNAMIC));
- [ ] Wrap KCL launcher into manager component

## Iteration +2
- [ ] Performance improvements (simdjson?)
- [ ] Use SQLAlchemy for generating and submitting SQL statement
- [ ] Improve efficiency by using bulk operations when applicable
- [ ] is in UPDATE_ROLLBACK_COMPLETE_CLEANUP_IN_PROGRESS state and can not be updated
- [ ] is in ROLLBACK_COMPLETE state and can not be updated.
- [ ] Cannot create a publicly accessible DBInstance. The specified VPC has no
internet gateway attached.Update the VPC and then try again
79 changes: 79 additions & 0 deletions doc/carabas/dms/handbook.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
(aws-dms-handbook)=
# AWS DMS Handbook

A few useful AWSCLI commands to check the status of the DMS engine and
relevant pipeline elements. You can also use the AWS Web Console to
inspect and commandeer the same details.


## Status Checks
Display ARNs of all replication instances.
```shell
aws dms describe-replication-instances | jq -r '.ReplicationInstances[].ReplicationInstanceArn'
```
Display replication endpoints and relevant connection settings.
```shell
aws dms describe-endpoints
```
Invoke connection test on given DMS endpoint.
```shell
aws dms test-connection \
--replication-instance-arn arn:aws:dms:eu-central-1:831394476016:rep:JD2LL6OM35BJZNKZIRSOE2FXIY \
--endpoint-arn arn:aws:dms:eu-central-1:831394476016:endpoint:3IVDGL6E4RDNBF2LFBYF6DYV3Y
```
Display connection test results.
```shell
aws dms describe-connections
```


## Operations
Enumerate all configured replication tasks with compact output.
```shell
aws dms describe-replication-tasks | \
jq '.ReplicationTasks[] | {ReplicationTaskIdentifier, ReplicationTaskArn, MigrationType, StartReplicationType, Status, StopReason, FailureMessages, ProvisionData}'
```
Start replication task with given ARN.
```shell
aws dms start-replication-task \
--start-replication-task-type start-replication --replication-task-arn \
arn:aws:dms:eu-central-1:831394476016:task:7QBLNBTPCNDEBG7CHI3WA73YFA
```
Stop replication task with given ARN.
```shell
aws dms stop-replication-task --replication-task-arn \
arn:aws:dms:eu-central-1:831394476016:task:7QBLNBTPCNDEBG7CHI3WA73YFA
```


## Logging
To see detailed progress about the replication process, use CloudWatch to
inspect corresponding log output.

Enumerate all log groups.
```shell
aws logs describe-log-groups
```

Get log output history.
```shell
aws logs get-log-events \
--log-group-name dms-tasks-testdrive-dms-instance \
--log-stream-name dms-task-7QBLNBTPCNDEBG7CHI3WA73YFA | jq .events[].message
```

Start watching the log output using the `start-live-tail` CloudWatch operation.
```shell
aws logs start-live-tail --log-group-identifiers \
arn:aws:logs:eu-central-1:831394476016:log-group:/aws/rds/instance/testdrive-dms-postgresql-dev-db/postgresql \
arn:aws:logs:eu-central-1:831394476016:log-group:dms-tasks-testdrive-dms-instance
```


## CloudFormation
When the CloudFormation deployment is stuck, or if you want to start from scratch,
those commands are useful.
```shell
aws cloudformation continue-update-rollback --stack-name testdrive-dms-postgresql-dev
aws cloudformation delete-stack --stack-name testdrive-dms-postgresql-dev
```
135 changes: 135 additions & 0 deletions doc/carabas/dms/index.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
(aws-dms)=
# Pipelines with AWS DMS

_AWS DMS to Kinesis to CrateDB._

## What's Inside
- [Working with AWS DMS tasks]
- [Using Amazon Kinesis Data Streams as a target for AWS Database Migration Service]
- An IaC driver program based on [AWS CloudFormation] technologies using the
[cottonformation] Python API. It can be used to set up infrastructure on AWS
without much ado.
- DMS: Full load and CDC
- DMS Source: RDS PostgreSQL
- DMS Target: Amazon Kinesis
- CDC Target: CrateDB Cloud


## AWS Infrastructure Setup
The following walkthrough describes a full deployment of AWS DMS including
relevant outbound data processors for demonstration purposes.

In order to run it in production, you are welcome to derive from it and tweak
it for your own purposes. YMMV. If you need support, don't hesitate to ask for
help.

### Install
Install LorryStream.
```shell
pip install --upgrade 'lorrystream[carabas]'
```
Acquire IaC driver program.
```shell
wget https://github.com/daq-tools/lorrystream/raw/main/examples/aws/rds_postgresql_kinesis_lambda_oci_cratedb.py
```

### Configure
Please configure endpoint and replication settings within the source code
of the IaC program you just acquired, and presented next.

### Deploy
First, prepare an AWS ECR repository for publishing the OCI image including your
downstream processor element that is consuming the replication data stream from
Amazon Kinesis, and runs it into CrateDB. To learn about how this works, please
visit the documentation section about the [](project:#ecr-repository).

Configure CrateDB database sink address.
```shell
export SINK_SQLALCHEMY_URL='crate://admin:dZ..qB@example.eks1.eu-west-1.aws.cratedb.net:4200/?ssl=true'
```

Invoke the IaC driver program in order to deploy relevant resources on AWS.
```shell
python examples/aws/rds_postgresql_kinesis_lambda_oci_cratedb.py
```

After deployment succeeded, you will be presented a corresponding response including
relevant information about entrypoints to the software stack you've just created.
```text
Result of CloudFormation deployment:
psql command: psql "postgresql://dynapipe:secret11@testdrive-dms-postgresql-dev-db.czylftvqn1ed.eu-central-1.rds.amazonaws.com:5432/postgres"
RDS Instance ARN: arn:aws:rds:eu-central-1:831394476016:db:testdrive-dms-postgresql-dev-db
Stream ARN: arn:aws:kinesis:eu-central-1:831394476016:stream/testdrive-dms-postgresql-dev-stream
Replication ARN: arn:aws:dms:eu-central-1:831394476016:replication-config:EAM3JEHXGBGZBPN5PLON7NPDEE
```

:::{note}
Please note this is a demonstration stack, deviating from typical real-world situations.

- Contrary to this stack, which includes an RDS PostgreSQL instance, a database instance
will already be up and running, so the remaining task is to just configure the Kinesis
Data Stream and consume it.

- Contrary to this stack, which uses AWS Lambda to host the downstream processor element,
when aiming for better cost-effectiveness, you will run corresponding code on a dedicated
computing environment.
:::


## Operations
Please consult the [](project:#aws-dms-handbook) to learn about commands
suitable for operating the AWS DMS engine.

:::{toctree}
:hidden:

handbook
:::



## Usage

### DMS
AWS DMS provides `full-load` and `full-load-and-cdc` migration types.
For a `full-load-and-cdc` task, AWS DMS migrates table data, and then applies
data changes that occur on the source, automatically establishing continuous
replication.

When starting a replication task using [StartReplicationTask], you can use those
possible values for `--start-replication-task-type`, see also [start-replication-task]:

:start-replication:
The only valid value for the first run of the task when the migration type is
`full-load` or `full-load-and-cdc`

:resume-processing:
Not applicable for any full-load task, because you can't resume partially loaded
tables during the full load phase. Use it to replicate the changes from the last
stop position.

:reload-target:
For a `full-load-and-cdc` task, load all the tables again, and start capturing
source changes.


## Migration by DMS Source
This section enumerates specific information to consider when aiming to use DMS
for your database as a source element.

:::{toctree}
:maxdepth: 2

postgresql
mysql
:::



[AWS CloudFormation]: https://en.wikipedia.org/wiki/AWS_CloudFormation
[cottonformation]: https://pypi.org/project/cottonformation/
[StartReplicationTask]: https://docs.aws.amazon.com/dms/latest/APIReference/API_StartReplicationTask.html
[start-replication-task]: https://docs.aws.amazon.com/cli/latest/reference/dms/start-replication-task.html
[Using Amazon Kinesis Data Streams as a target for AWS Database Migration Service]: https://docs.aws.amazon.com/dms/latest/userguide/CHAP_Target.Kinesis.html
[Using object mapping to migrate data to a Kinesis data stream]: https://docs.aws.amazon.com/dms/latest/userguide/CHAP_Target.Kinesis.html#CHAP_Target.Kinesis.ObjectMapping
[Working with AWS DMS tasks]: https://docs.aws.amazon.com/dms/latest/userguide/CHAP_Tasks.html
4 changes: 4 additions & 0 deletions doc/carabas/dms/mysql.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
(aws-dms-mysql)=
# AWS DMS with MySQL/MariaDB source

WIP.
57 changes: 57 additions & 0 deletions doc/carabas/dms/postgresql.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
(aws-dms-postgresql)=
# AWS DMS with PostgreSQL source

## What's Inside
- [Using a PostgreSQL database as an AWS DMS source]

### Prerequisites
First of all, activate the `pglocical` extension on your RDS PostgreSQL instance.
```sql
CREATE EXTENSION pglogical;
SELECT * FROM pg_catalog.pg_extension WHERE extname='pglogical';
```

```sql
SHOW shared_preload_libraries;
SELECT name, setting FROM pg_settings WHERE name in ('rds.logical_replication','shared_preload_libraries');
```


### Data in Source
After that, connect to RDS PostgreSQL, and provision a little bunch of data.
```sql
DROP TABLE IF EXISTS foo CASCADE;
CREATE TABLE foo (id INT PRIMARY KEY, name TEXT, age INT, attributes JSONB);
INSERT INTO foo (id, name, age, attributes) VALUES (42, 'John', 30, '{"foo": "bar"}');
INSERT INTO foo (id, name, age, attributes) VALUES (43, 'Jane', 31, '{"baz": "qux"}');
```

### Data in Target
```sql
cr> SELECT * FROM public.foo;
```
```postgresql
+---------------------------------------------------------------------+
| data |
+---------------------------------------------------------------------+
| {"age": 30, "attributes": {"foo": "bar"}, "id": 42, "name": "John"} |
| {"age": 31, "attributes": {"baz": "qux"}, "id": 43, "name": "Jane"} |
+---------------------------------------------------------------------+
```



```sql
UPDATE foo SET age=32 WHERE name='Jane';
UPDATE foo SET age=33 WHERE id=43;
UPDATE foo SET age=33 WHERE attributes->>'foo'='bar';
UPDATE foo SET attributes = jsonb_set(attributes, '{last_name}', '"Doe"', true) WHERE name='John';
```
```sql
DELETE FROM foo WHERE name='Jane';
DELETE FROM foo WHERE name='John';
```



[Using a PostgreSQL database as an AWS DMS source]: https://docs.aws.amazon.com/dms/latest/userguide/CHAP_Source.PostgreSQL.html
53 changes: 53 additions & 0 deletions doc/carabas/index.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
# Carabas

A subsystem to divert workloads to other people's computers.
Workloads can be whole pipelines or elements of pipelines.
Provides blended computing environments on your fingertips.

## Etymology
- [Marquis von Carabas]
- [Die Meisterkatze oder der gestiefelte Kater]
- [Le Maître chat ou le Chat botté]
- [Puss in Boots]

## What's Inside

### Kinesis KCL v2
:::{toctree}
:maxdepth: 2
:glob:
kcl/kinesis
:::

### DynamoDB -> Kinesis KCL v2
:::{toctree}
:maxdepth: 2
:glob:
kcl/dynamodb*
:::

### DMS -> Kinesis
:::{toctree}
:maxdepth: 2
dms/index
:::

### Kinesis -> Lambda
:::{toctree}
:maxdepth: 2
lambda/index
:::


## Development
:::{toctree}
:maxdepth: 2
backlog
research
:::


[Die Meisterkatze oder der gestiefelte Kater]: https://de.frwiki.wiki/wiki/Le_Ma%C3%AEtre_chat_ou_le_Chat_bott%C3%A9
[Le Maître chat ou le Chat botté]: https://fr.wikipedia.org/wiki/Le_Ma%C3%AEtre_chat_ou_le_Chat_bott%C3%A9
[Marquis von Carabas]: https://de.frwiki.wiki/wiki/Marquis_de_Carabas
[Puss in Boots]: https://en.wikipedia.org/wiki/Puss_in_Boots
Loading
Loading