Skip to content

Commit

Permalink
DynamoDB: Get CDC event to SQL translator right, improve KCLv2 launcher
Browse files Browse the repository at this point in the history
- CDC-to-SQL
  - Provides concise interface:
    `DynamoCDCTranslatorCrateDB(table_name="foobar").to_sql(cdc_event)`
  - Uses `boto3.dynamodb.types.TypeDeserializer` to handle all data
    types of DynamoDB without further ado
  - Uses `simplejson` to convert `Decimal` types without further ado

- Improve KCLv2 launcher: Use environment variables for configuration:
  `CDC_SQLALCHEMY_URL`, `CDC_TABLE_NAME`, `CDC_LOGFILE`

- Turn off metrics logging to CloudWatch?

- Update backlog
  • Loading branch information
amotl committed Jul 13, 2024
1 parent 1083b4d commit 17cb85b
Show file tree
Hide file tree
Showing 11 changed files with 313 additions and 148 deletions.
4 changes: 2 additions & 2 deletions lorrystream/dynamodb_cloud/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ crash -c "CREATE TABLE transactions (data OBJECT(DYNAMIC));"
Capture DynamoDB table operations and relay them to a Kinesis stream.
```shell
# Create a Kinesis Data Stream.
aws kinesis create-stream --stream-name dynamodb-cdc --shard-count 4
aws kinesis create-stream --stream-name dynamodb-cdc --shard-count 4

# Check that the Kinesis stream is active.
aws kinesis describe-stream --stream-name dynamodb-cdc
Expand Down Expand Up @@ -149,7 +149,7 @@ export AWS_SECRET_ACCESS_KEY=...
Launch the stream processor, subscribing to the DynamoDB CDC operations feed
over a Kinesis stream.
```shell
$(sh launch.sh dynamodb_cdc_processor.properties)
sh launch.sh dynamodb_cdc_processor.properties
```

Watch actions of the CDC processor.
Expand Down
25 changes: 20 additions & 5 deletions lorrystream/dynamodb_cloud/backlog.md
Original file line number Diff line number Diff line change
@@ -1,9 +1,24 @@
# DynamoDB CDC processing backlog

## Iteration +1
- Improve type mapping.
- Use SQLAlchemy for generating and submitting SQL statement.
- Improve efficiency by using bulk operations when applicable.
- [x] Improve type mapping
- [x] Generalize CDC event -> SQL translator
- [ ] Distill into a Lambda variant
- [ ] Automation!
- [ ] DDL: CREATE TABLE <tablename> (data OBJECT(DYNAMIC));
- [ ] Wrap KCL launcher into manager component

CREATE TABLE transactions (data OBJECT(DYNAMIC));
CREATE TABLE transactions (id INT) WITH (column_policy = 'dynamic');
## Iteration +2
- [ ] Performance improvements (simdjson?)
- [ ] Use SQLAlchemy for generating and submitting SQL statement
- [ ] Improve efficiency by using bulk operations when applicable

## Research
- https://pypi.org/project/core-cdc
- https://github.com/sshd123/pypgoutput
- https://pypi.org/project/pypg-cdc/
- https://github.com/hcevikGA/dynamo-wrapper
- https://pypi.org/project/dynamo-pandas/
- https://aws.amazon.com/de/blogs/opensource/announcing-partiql-one-query-language-for-all-your-data/
- https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/ql-reference.html
- https://partiql.org/dql/overview.html
106 changes: 0 additions & 106 deletions lorrystream/dynamodb_cloud/decoder.py

This file was deleted.

6 changes: 6 additions & 0 deletions lorrystream/dynamodb_cloud/dynamodb_cdc_processor.properties
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
# Configuration file for Kinesis Client Library (KCLv2).
# https://github.com/awslabs/amazon-kinesis-client/blob/v2.6.0/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/coordinator/KinesisClientLibConfiguration.java#L210-L245

# The script that abides by the multi-language protocol. This script will
# be executed by the MultiLangDaemon, which will communicate with this script
# over STDIN and STDOUT according to the multi-language protocol.
Expand Down Expand Up @@ -81,3 +84,6 @@ regionName = us-east-1
# active threads set to the provided value. If a non-positive integer or no
# value is provided a CachedThreadPool is used.
#maxActiveThreads = 0

# Whether to report metrics to CloudWatch?
metricsLevel = none
87 changes: 65 additions & 22 deletions lorrystream/dynamodb_cloud/dynamodb_cdc_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,48 +8,66 @@
import json
import logging
import logging.handlers as handlers
import os
import time
import typing as t

from amazon_kclpy import kcl
from amazon_kclpy.v3 import processor
from cratedb_toolkit.util import DatabaseAdapter

from lorrystream.dynamodb_cloud.decoder import OpsLogDecoder
from lorrystream.transform.dynamodb import DynamoCDCTranslatorCrateDB

# Logger writes to file because stdout is used by MultiLangDaemon
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
formatter = logging.Formatter(
"%(asctime)s.%(msecs)03d [%(module)s] %(levelname)s %(funcName)s - %(message)s", "%H:%M:%S"
)
handler = handlers.RotatingFileHandler("dynamodb_cdc_processor.log", maxBytes=10**6, backupCount=5)
handler.setLevel(logging.INFO)
handler.setFormatter(formatter)
logger.addHandler(handler)


IntOrNone = t.Union[int, None]
FloatOrNone = t.Union[float, None]


def setup_logging(logfile: str):
"""
Configure Python logger to write to file, because stdout is used by MultiLangDaemon.
"""
logger.setLevel(logging.INFO)
formatter = logging.Formatter(
"%(asctime)s.%(msecs)03d [%(module)s] %(levelname)s %(funcName)s - %(message)s", "%H:%M:%S"
)
handler = handlers.RotatingFileHandler(logfile, maxBytes=10**6, backupCount=5)
handler.setLevel(logging.INFO)
handler.setFormatter(formatter)
logger.addHandler(handler)


class RecordProcessor(processor.RecordProcessorBase):
"""
A RecordProcessor processes data from a shard in a stream. Its methods will be called with this pattern:
Process data from a shard in a stream. Its methods will be called with this pattern:
* initialize will be called once
* process_records will be called zero or more times
* shutdown will be called if this MultiLangDaemon instance loses the lease to this shard, or the shard ends due
a scaling change.
"""

def __init__(self):
def __init__(self, sqlalchemy_url: t.Optional[str], table_name: t.Optional[str]):
self._SLEEP_SECONDS = 5
self._CHECKPOINT_RETRIES = 5
self._CHECKPOINT_FREQ_SECONDS = 60
self._largest_seq: t.Tuple[IntOrNone, IntOrNone] = (None, None)
self._largest_sub_seq = None
self._last_checkpoint_time = None
self.cratedb = DatabaseAdapter(dburi="crate://")
self._last_checkpoint_time: FloatOrNone = None

self.sqlalchemy_url = sqlalchemy_url
self.table_name = table_name

# Sanity checks.
if self.sqlalchemy_url is None:
raise ValueError("SQLAlchemy URL must not be empty")
if self.table_name is None:
raise ValueError("Target CDC table name must not be empty")

self.cratedb = DatabaseAdapter(dburi=self.sqlalchemy_url)
self.table_name = self.table_name
self.cdc = DynamoCDCTranslatorCrateDB(table_name=self.table_name)

def initialize(self, initialize_input):
"""
Expand Down Expand Up @@ -112,13 +130,24 @@ def process_record(self, data, partition_key, sequence_number, sub_sequence_numb
:param int sequence_number: The sequence number associated with this record.
:param int sub_sequence_number: the sub sequence number associated with this record.
"""
cdc_event = json.loads(data)
logger.info("CDC event: %s", cdc_event)

sql = OpsLogDecoder.decode_opslog_item(cdc_event)
logger.info("SQL: %s", sql)
sql = None
try:
cdc_event = json.loads(data)
logger.info("CDC event: %s", cdc_event)

sql = self.cdc.to_sql(cdc_event)
logger.info("SQL: %s", sql)
except Exception:
logger.exception("Decoding CDC event failed")

if not sql:
return

self.cratedb.run_sql(sql)
try:
self.cratedb.run_sql(sql)
except Exception:
logger.exception("Writing CDC event to sink database failed")

def should_update_sequence(self, sequence_number, sub_sequence_number):
"""
Expand Down Expand Up @@ -174,6 +203,20 @@ def shutdown_requested(self, shutdown_requested_input):
shutdown_requested_input.checkpointer.checkpoint()


if __name__ == "__main__":
kcl_process = kcl.KCLProcess(RecordProcessor())
def main():
# Set up logging.
logfile = os.environ.get("CDC_LOGFILE", "cdc.log")
setup_logging(logfile)

# Setup processor.
sqlalchemy_url = os.environ.get("CDC_SQLALCHEMY_URL")
table_name = os.environ.get("CDC_TABLE_NAME")
kcl_processor = RecordProcessor(sqlalchemy_url=sqlalchemy_url, table_name=table_name)

# Invoke machinery.
kcl_process = kcl.KCLProcess(kcl_processor)
kcl_process.run()


if __name__ == "__main__":
main()
16 changes: 15 additions & 1 deletion lorrystream/dynamodb_cloud/launch.sh
Original file line number Diff line number Diff line change
@@ -1 +1,15 @@
python amazon_kclpy_helper.py --print_command --java /usr/bin/java --properties "$1" --log-configuration logback.xml
#!/bin/sh

# Configure record processor.
export CDC_SQLALCHEMY_URL=crate://
export CDC_TABLE_NAME=transactions
export CDC_LOGFILE=dynamodb_cdc_processor.log

# Invoke KCL launcher.
KCLPY_PATH=$(python -c 'import amazon_kclpy; print(amazon_kclpy.__path__[0])')
/usr/bin/java \
-DstreamName=dynamodb-cdc-nested \
-cp "${KCLPY_PATH}/jars/*" \
software.amazon.kinesis.multilang.MultiLangDaemon \
--properties-file "$1" \
--log-configuration logback.xml
2 changes: 2 additions & 0 deletions lorrystream/dynamodb_cloud/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,4 @@
amazon-kclpy==2.1.5
awscli==1.33.*
boto3<1.35
simplejson<4
Empty file.
Loading

0 comments on commit 17cb85b

Please sign in to comment.