Skip to content

Commit 31d6744

Browse files
committed
DynamoDB: Decode CDC event records
1 parent b77b5fa commit 31d6744

File tree

5 files changed

+230
-5
lines changed

5 files changed

+230
-5
lines changed

lorrystream/dynamodb_cloud/README.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,14 @@ aws dynamodb list-tables
7979
aws dynamodb describe-table --table-name table-testdrive | grep TableStatus
8080
```
8181

82+
### CrateDB Table
83+
The destination table name in CrateDB is currently hard-coded. Please use
84+
this command to create the `transactions` table, where the CDC record
85+
processor will re-materialize CDC events into.
86+
```shell
87+
crash -c "CREATE TABLE transactions (data OBJECT(DYNAMIC));"
88+
```
89+
8290
### Kinesis Stream
8391
Capture DynamoDB table operations and relay them to a Kinesis stream.
8492
```shell

lorrystream/dynamodb_cloud/backlog.md

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
# DynamoDB CDC processing backlog
2+
3+
## Iteration +1
4+
- Improve type mapping.
5+
- Use SQLAlchemy for generating and submitting SQL statement.
6+
- Improve efficiency by using bulk operations when applicable.
7+
8+
CREATE TABLE transactions (data OBJECT(DYNAMIC));
9+
CREATE TABLE transactions (id INT) WITH (column_policy = 'dynamic');

lorrystream/dynamodb_cloud/decoder.py

Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
# ruff: noqa: S608
2+
import json
3+
import logging
4+
import typing as t
5+
from collections import OrderedDict
6+
7+
from lorrystream.util.data import asbool
8+
9+
logger = logging.getLogger(__name__)
10+
11+
12+
class OpsLogDecoder:
13+
"""
14+
Utilities for decoding DynamoDB CDC operations events.
15+
"""
16+
17+
@classmethod
18+
def decode_opslog_item(cls, record: t.Dict[str, t.Any]):
19+
"""
20+
DROP TABLE transactions;
21+
CREATE TABLE transactions (id INT) WITH (column_policy = 'dynamic');
22+
CREATE TABLE transactions (data OBJECT(DYNAMIC));
23+
24+
-- https://www.singlestore.com/blog/cdc-data-from-dynamodb-to-singlestore-using-dynamodb-streams/
25+
"""
26+
event_source = record.get("eventSource")
27+
event_name = record.get("eventName")
28+
if event_source != "aws:dynamodb":
29+
raise ValueError(f"Unknown eventSource: {event_source}")
30+
31+
if event_name == "INSERT":
32+
json_str = json.dumps(cls.materialize_new_image(record["dynamodb"]["NewImage"]))
33+
sql = f"INSERT INTO transactions (data) VALUES ('{json_str}');".strip()
34+
35+
elif event_name == "MODIFY":
36+
key1 = record["dynamodb"]["Keys"]["device"]["S"]
37+
key2 = record["dynamodb"]["Keys"]["timestamp"]["S"]
38+
json_str = json.dumps(cls.materialize_new_image(record["dynamodb"]["NewImage"]))
39+
sql = f"""
40+
UPDATE transactions
41+
SET data = '{json_str}'
42+
WHERE data['device'] = '{key1}' AND data['timestamp'] = '{key2}';""".strip()
43+
44+
elif event_name == "REMOVE":
45+
key1 = record["dynamodb"]["Keys"]["device"]["S"]
46+
key2 = record["dynamodb"]["Keys"]["timestamp"]["S"]
47+
sql = f"""
48+
DELETE FROM transactions
49+
WHERE data['device'] = '{key1}' AND data['timestamp'] = '{key2}';""".strip()
50+
51+
else:
52+
raise ValueError(f"Unknown CDC event name: {event_name}")
53+
54+
return sql
55+
56+
@classmethod
57+
def materialize_new_image(cls, item: t.Dict[str, t.Dict[str, str]]) -> t.Dict[str, str]:
58+
"""
59+
{
60+
"humidity": {"N": "84.84"},
61+
"temperature": {"N": "42.42"},
62+
"device": {"S": "qux"},
63+
"timestamp": {"S": "2024-07-12T01:17:42"},
64+
}
65+
66+
A complete list of DynamoDB data type descriptors:
67+
68+
S – String
69+
N – Number
70+
B – Binary
71+
BOOL – Boolean
72+
NULL – Null
73+
M – Map
74+
L – List
75+
SS – String Set
76+
NS – Number Set
77+
BS – Binary Set
78+
79+
"""
80+
out = OrderedDict()
81+
for key, value_composite in item.items():
82+
type_: str = list(value_composite.keys())[0]
83+
value: t.Any = list(value_composite.values())[0]
84+
if type_ == "S":
85+
# TODO: Add heuristics for detecting types of timestamps or others?
86+
pass
87+
elif type_ == "N":
88+
value = float(value)
89+
elif type_ == "B":
90+
raise NotImplementedError(f"Type not implemented yet: {type_}")
91+
elif type_ == "BOOL":
92+
value = asbool(value)
93+
elif type_ == "NULL":
94+
value = None
95+
elif type_ == "M":
96+
raise NotImplementedError(f"Type not implemented yet: {type_}")
97+
elif type_ == "L":
98+
raise NotImplementedError(f"Type not implemented yet: {type_}")
99+
elif type_ == "SS":
100+
raise NotImplementedError(f"Type not implemented yet: {type_}")
101+
elif type_ == "NS":
102+
raise NotImplementedError(f"Type not implemented yet: {type_}")
103+
elif type_ == "BS":
104+
raise NotImplementedError(f"Type not implemented yet: {type_}")
105+
out[key] = value
106+
return out

lorrystream/dynamodb_cloud/dynamodb_cdc_processor.py

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,17 @@
55

66
from __future__ import print_function
77

8+
import json
89
import logging
910
import logging.handlers as handlers
1011
import time
1112
import typing as t
1213

1314
from amazon_kclpy import kcl
1415
from amazon_kclpy.v3 import processor
16+
from cratedb_toolkit.util import DatabaseAdapter
17+
18+
from lorrystream.dynamodb_cloud.decoder import OpsLogDecoder
1519

1620
# Logger writes to file because stdout is used by MultiLangDaemon
1721
logger = logging.getLogger(__name__)
@@ -45,6 +49,7 @@ def __init__(self):
4549
self._largest_seq: t.Tuple[IntOrNone, IntOrNone] = (None, None)
4650
self._largest_sub_seq = None
4751
self._last_checkpoint_time = None
52+
self.cratedb = DatabaseAdapter(dburi="crate://")
4853

4954
def initialize(self, initialize_input):
5055
"""
@@ -99,18 +104,21 @@ def checkpoint(self, checkpointer, sequence_number=None, sub_sequence_number=Non
99104

100105
def process_record(self, data, partition_key, sequence_number, sub_sequence_number):
101106
"""
102-
Called for each record that is passed to process_records.
107+
Convert record, which is a DynamoDB CDC event item, into an SQL statement,
108+
and submit to downstream database.
103109
104110
:param str data: The blob of data that was contained in the record.
105111
:param str partition_key: The key associated with this recod.
106112
:param int sequence_number: The sequence number associated with this record.
107113
:param int sub_sequence_number: the sub sequence number associated with this record.
108114
"""
109-
####################################
110-
# Insert your processing logic here
111-
####################################
115+
cdc_event = json.loads(data)
116+
logger.info("CDC event: %s", cdc_event)
117+
118+
sql = OpsLogDecoder.decode_opslog_item(cdc_event)
119+
logger.info("SQL: %s", sql)
112120

113-
logger.info(data.decode("UTF-8"))
121+
self.cratedb.run_sql(sql)
114122

115123
def should_update_sequence(self, sequence_number, sub_sequence_number):
116124
"""
Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
from lorrystream.dynamodb_cloud.decoder import OpsLogDecoder
2+
3+
MSG_INSERT = {
4+
"awsRegion": "us-east-1",
5+
"eventID": "b015b5f0-c095-4b50-8ad0-4279aa3d88c6",
6+
"eventName": "INSERT",
7+
"userIdentity": None,
8+
"recordFormat": "application/json",
9+
"tableName": "table-testdrive",
10+
"dynamodb": {
11+
"ApproximateCreationDateTime": 1720740233012995,
12+
"Keys": {"device": {"S": "foo"}, "timestamp": {"S": "2024-07-12T01:17:42"}},
13+
"NewImage": {
14+
"humidity": {"N": "84.84"},
15+
"temperature": {"N": "42.42"},
16+
"device": {"S": "foo"},
17+
"timestamp": {"S": "2024-07-12T01:17:42"},
18+
},
19+
"SizeBytes": 99,
20+
"ApproximateCreationDateTimePrecision": "MICROSECOND",
21+
},
22+
"eventSource": "aws:dynamodb",
23+
}
24+
MSG_MODIFY = {
25+
"awsRegion": "us-east-1",
26+
"eventID": "24757579-ebfd-480a-956d-a1287d2ef707",
27+
"eventName": "MODIFY",
28+
"userIdentity": None,
29+
"recordFormat": "application/json",
30+
"tableName": "table-testdrive",
31+
"dynamodb": {
32+
"ApproximateCreationDateTime": 1720742302233719,
33+
"Keys": {"device": {"S": "foo"}, "timestamp": {"S": "2024-07-12T01:17:42"}},
34+
"NewImage": {
35+
"humidity": {"N": "84.84"},
36+
"temperature": {"N": "55.66"},
37+
"device": {"S": "bar"},
38+
"timestamp": {"S": "2024-07-12T01:17:42"},
39+
},
40+
"OldImage": {
41+
"humidity": {"N": "84.84"},
42+
"temperature": {"N": "42.42"},
43+
"device": {"S": "foo"},
44+
"timestamp": {"S": "2024-07-12T01:17:42"},
45+
},
46+
"SizeBytes": 161,
47+
"ApproximateCreationDateTimePrecision": "MICROSECOND",
48+
},
49+
"eventSource": "aws:dynamodb",
50+
}
51+
MSG_REMOVE = {
52+
"awsRegion": "us-east-1",
53+
"eventID": "ff4e68ab-0820-4a0c-80b2-38753e8e00e5",
54+
"eventName": "REMOVE",
55+
"userIdentity": None,
56+
"recordFormat": "application/json",
57+
"tableName": "table-testdrive",
58+
"dynamodb": {
59+
"ApproximateCreationDateTime": 1720742321848352,
60+
"Keys": {"device": {"S": "bar"}, "timestamp": {"S": "2024-07-12T01:17:42"}},
61+
"OldImage": {
62+
"humidity": {"N": "84.84"},
63+
"temperature": {"N": "55.66"},
64+
"device": {"S": "bar"},
65+
"timestamp": {"S": "2024-07-12T01:17:42"},
66+
},
67+
"SizeBytes": 99,
68+
"ApproximateCreationDateTimePrecision": "MICROSECOND",
69+
},
70+
"eventSource": "aws:dynamodb",
71+
}
72+
73+
74+
def test_decode_insert():
75+
assert (
76+
OpsLogDecoder.decode_opslog_item(MSG_INSERT) == "INSERT INTO transactions (data) "
77+
'VALUES (\'{"humidity": 84.84, "temperature": 42.42, "device": "foo", "timestamp": "2024-07-12T01:17:42"}\');'
78+
)
79+
80+
81+
def test_decode_modify():
82+
assert (
83+
OpsLogDecoder.decode_opslog_item(MSG_MODIFY) == "UPDATE transactions\n "
84+
'SET data = \'{"humidity": 84.84, "temperature": 55.66, '
85+
'"device": "bar", "timestamp": "2024-07-12T01:17:42"}\'\n '
86+
"WHERE data['device'] = 'foo' AND data['timestamp'] = '2024-07-12T01:17:42';"
87+
)
88+
89+
90+
def test_decode_remove():
91+
assert (
92+
OpsLogDecoder.decode_opslog_item(MSG_REMOVE) == "DELETE FROM transactions\n "
93+
"WHERE data['device'] = 'bar' AND data['timestamp'] = '2024-07-12T01:17:42';"
94+
)

0 commit comments

Comments
 (0)