diff --git a/lorrystream/dynamodb_cloud/README.md b/lorrystream/dynamodb_cloud/README.md index 7d2a7d9..10fdbc6 100644 --- a/lorrystream/dynamodb_cloud/README.md +++ b/lorrystream/dynamodb_cloud/README.md @@ -91,7 +91,7 @@ crash -c "CREATE TABLE transactions (data OBJECT(DYNAMIC));" Capture DynamoDB table operations and relay them to a Kinesis stream. ```shell # Create a Kinesis Data Stream. -aws kinesis create-stream --stream-name dynamodb-cdc --shard-count 4 +aws kinesis create-stream --stream-name dynamodb-cdc --shard-count 4 # Check that the Kinesis stream is active. aws kinesis describe-stream --stream-name dynamodb-cdc @@ -149,7 +149,7 @@ export AWS_SECRET_ACCESS_KEY=... Launch the stream processor, subscribing to the DynamoDB CDC operations feed over a Kinesis stream. ```shell -$(sh launch.sh dynamodb_cdc_processor.properties) +sh launch.sh dynamodb_cdc_processor.properties ``` Watch actions of the CDC processor. diff --git a/lorrystream/dynamodb_cloud/backlog.md b/lorrystream/dynamodb_cloud/backlog.md index 4e487b7..fb05638 100644 --- a/lorrystream/dynamodb_cloud/backlog.md +++ b/lorrystream/dynamodb_cloud/backlog.md @@ -1,9 +1,24 @@ # DynamoDB CDC processing backlog ## Iteration +1 -- Improve type mapping. -- Use SQLAlchemy for generating and submitting SQL statement. -- Improve efficiency by using bulk operations when applicable. +- [x] Improve type mapping +- [x] Generalize CDC event -> SQL translator +- [ ] Distill into a Lambda variant +- [ ] Automation! + - [ ] DDL: CREATE TABLE (data OBJECT(DYNAMIC)); + - [ ] Wrap KCL launcher into manager component -CREATE TABLE transactions (data OBJECT(DYNAMIC)); -CREATE TABLE transactions (id INT) WITH (column_policy = 'dynamic'); \ No newline at end of file +## Iteration +2 +- [ ] Performance improvements (simdjson?) +- [ ] Use SQLAlchemy for generating and submitting SQL statement +- [ ] Improve efficiency by using bulk operations when applicable + +## Research +- https://pypi.org/project/core-cdc +- https://github.com/sshd123/pypgoutput +- https://pypi.org/project/pypg-cdc/ +- https://github.com/hcevikGA/dynamo-wrapper +- https://pypi.org/project/dynamo-pandas/ +- https://aws.amazon.com/de/blogs/opensource/announcing-partiql-one-query-language-for-all-your-data/ +- https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/ql-reference.html +- https://partiql.org/dql/overview.html diff --git a/lorrystream/dynamodb_cloud/decoder.py b/lorrystream/dynamodb_cloud/decoder.py deleted file mode 100644 index 3a65d45..0000000 --- a/lorrystream/dynamodb_cloud/decoder.py +++ /dev/null @@ -1,106 +0,0 @@ -# ruff: noqa: S608 -import json -import logging -import typing as t -from collections import OrderedDict - -from lorrystream.util.data import asbool - -logger = logging.getLogger(__name__) - - -class OpsLogDecoder: - """ - Utilities for decoding DynamoDB CDC operations events. - """ - - @classmethod - def decode_opslog_item(cls, record: t.Dict[str, t.Any]): - """ - DROP TABLE transactions; - CREATE TABLE transactions (id INT) WITH (column_policy = 'dynamic'); - CREATE TABLE transactions (data OBJECT(DYNAMIC)); - - -- https://www.singlestore.com/blog/cdc-data-from-dynamodb-to-singlestore-using-dynamodb-streams/ - """ - event_source = record.get("eventSource") - event_name = record.get("eventName") - if event_source != "aws:dynamodb": - raise ValueError(f"Unknown eventSource: {event_source}") - - if event_name == "INSERT": - json_str = json.dumps(cls.materialize_new_image(record["dynamodb"]["NewImage"])) - sql = f"INSERT INTO transactions (data) VALUES ('{json_str}');".strip() - - elif event_name == "MODIFY": - key1 = record["dynamodb"]["Keys"]["device"]["S"] - key2 = record["dynamodb"]["Keys"]["timestamp"]["S"] - json_str = json.dumps(cls.materialize_new_image(record["dynamodb"]["NewImage"])) - sql = f""" - UPDATE transactions - SET data = '{json_str}' - WHERE data['device'] = '{key1}' AND data['timestamp'] = '{key2}';""".strip() - - elif event_name == "REMOVE": - key1 = record["dynamodb"]["Keys"]["device"]["S"] - key2 = record["dynamodb"]["Keys"]["timestamp"]["S"] - sql = f""" - DELETE FROM transactions - WHERE data['device'] = '{key1}' AND data['timestamp'] = '{key2}';""".strip() - - else: - raise ValueError(f"Unknown CDC event name: {event_name}") - - return sql - - @classmethod - def materialize_new_image(cls, item: t.Dict[str, t.Dict[str, str]]) -> t.Dict[str, str]: - """ - { - "humidity": {"N": "84.84"}, - "temperature": {"N": "42.42"}, - "device": {"S": "qux"}, - "timestamp": {"S": "2024-07-12T01:17:42"}, - } - - A complete list of DynamoDB data type descriptors: - - S – String - N – Number - B – Binary - BOOL – Boolean - NULL – Null - M – Map - L – List - SS – String Set - NS – Number Set - BS – Binary Set - - """ - out = OrderedDict() - for key, value_composite in item.items(): - type_: str = list(value_composite.keys())[0] - value: t.Any = list(value_composite.values())[0] - if type_ == "S": - # TODO: Add heuristics for detecting types of timestamps or others? - pass - elif type_ == "N": - value = float(value) - elif type_ == "B": - raise NotImplementedError(f"Type not implemented yet: {type_}") - elif type_ == "BOOL": - value = asbool(value) - elif type_ == "NULL": - value = None - elif type_ == "M": - raise NotImplementedError(f"Type not implemented yet: {type_}") - elif type_ == "L": - raise NotImplementedError(f"Type not implemented yet: {type_}") - elif type_ == "SS": - raise NotImplementedError(f"Type not implemented yet: {type_}") - elif type_ == "NS": - raise NotImplementedError(f"Type not implemented yet: {type_}") - elif type_ == "BS": - raise NotImplementedError(f"Type not implemented yet: {type_}") - out[key] = value - return out diff --git a/lorrystream/dynamodb_cloud/dynamodb_cdc_processor.properties b/lorrystream/dynamodb_cloud/dynamodb_cdc_processor.properties index 34cb182..a7c698f 100644 --- a/lorrystream/dynamodb_cloud/dynamodb_cdc_processor.properties +++ b/lorrystream/dynamodb_cloud/dynamodb_cdc_processor.properties @@ -1,3 +1,6 @@ +# Configuration file for Kinesis Client Library (KCLv2). +# https://github.com/awslabs/amazon-kinesis-client/blob/v2.6.0/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/coordinator/KinesisClientLibConfiguration.java#L210-L245 + # The script that abides by the multi-language protocol. This script will # be executed by the MultiLangDaemon, which will communicate with this script # over STDIN and STDOUT according to the multi-language protocol. @@ -81,3 +84,6 @@ regionName = us-east-1 # active threads set to the provided value. If a non-positive integer or no # value is provided a CachedThreadPool is used. #maxActiveThreads = 0 + +# Whether to report metrics to CloudWatch? +metricsLevel = none diff --git a/lorrystream/dynamodb_cloud/dynamodb_cdc_processor.py b/lorrystream/dynamodb_cloud/dynamodb_cdc_processor.py index 1332dc7..ed9a72c 100644 --- a/lorrystream/dynamodb_cloud/dynamodb_cdc_processor.py +++ b/lorrystream/dynamodb_cloud/dynamodb_cdc_processor.py @@ -8,6 +8,7 @@ import json import logging import logging.handlers as handlers +import os import time import typing as t @@ -15,26 +16,31 @@ from amazon_kclpy.v3 import processor from cratedb_toolkit.util import DatabaseAdapter -from lorrystream.dynamodb_cloud.decoder import OpsLogDecoder +from lorrystream.transform.dynamodb import DynamoCDCTranslatorCrateDB -# Logger writes to file because stdout is used by MultiLangDaemon logger = logging.getLogger(__name__) -logger.setLevel(logging.INFO) -formatter = logging.Formatter( - "%(asctime)s.%(msecs)03d [%(module)s] %(levelname)s %(funcName)s - %(message)s", "%H:%M:%S" -) -handler = handlers.RotatingFileHandler("dynamodb_cdc_processor.log", maxBytes=10**6, backupCount=5) -handler.setLevel(logging.INFO) -handler.setFormatter(formatter) -logger.addHandler(handler) - IntOrNone = t.Union[int, None] +FloatOrNone = t.Union[float, None] + + +def setup_logging(logfile: str): + """ + Configure Python logger to write to file, because stdout is used by MultiLangDaemon. + """ + logger.setLevel(logging.INFO) + formatter = logging.Formatter( + "%(asctime)s.%(msecs)03d [%(module)s] %(levelname)s %(funcName)s - %(message)s", "%H:%M:%S" + ) + handler = handlers.RotatingFileHandler(logfile, maxBytes=10**6, backupCount=5) + handler.setLevel(logging.INFO) + handler.setFormatter(formatter) + logger.addHandler(handler) class RecordProcessor(processor.RecordProcessorBase): """ - A RecordProcessor processes data from a shard in a stream. Its methods will be called with this pattern: + Process data from a shard in a stream. Its methods will be called with this pattern: * initialize will be called once * process_records will be called zero or more times @@ -42,14 +48,26 @@ class RecordProcessor(processor.RecordProcessorBase): a scaling change. """ - def __init__(self): + def __init__(self, sqlalchemy_url: t.Optional[str], table_name: t.Optional[str]): self._SLEEP_SECONDS = 5 self._CHECKPOINT_RETRIES = 5 self._CHECKPOINT_FREQ_SECONDS = 60 self._largest_seq: t.Tuple[IntOrNone, IntOrNone] = (None, None) self._largest_sub_seq = None - self._last_checkpoint_time = None - self.cratedb = DatabaseAdapter(dburi="crate://") + self._last_checkpoint_time: FloatOrNone = None + + self.sqlalchemy_url = sqlalchemy_url + self.table_name = table_name + + # Sanity checks. + if self.sqlalchemy_url is None: + raise ValueError("SQLAlchemy URL must not be empty") + if self.table_name is None: + raise ValueError("Target CDC table name must not be empty") + + self.cratedb = DatabaseAdapter(dburi=self.sqlalchemy_url) + self.table_name = self.table_name + self.cdc = DynamoCDCTranslatorCrateDB(table_name=self.table_name) def initialize(self, initialize_input): """ @@ -112,13 +130,24 @@ def process_record(self, data, partition_key, sequence_number, sub_sequence_numb :param int sequence_number: The sequence number associated with this record. :param int sub_sequence_number: the sub sequence number associated with this record. """ - cdc_event = json.loads(data) - logger.info("CDC event: %s", cdc_event) - sql = OpsLogDecoder.decode_opslog_item(cdc_event) - logger.info("SQL: %s", sql) + sql = None + try: + cdc_event = json.loads(data) + logger.info("CDC event: %s", cdc_event) + + sql = self.cdc.to_sql(cdc_event) + logger.info("SQL: %s", sql) + except Exception: + logger.exception("Decoding CDC event failed") + + if not sql: + return - self.cratedb.run_sql(sql) + try: + self.cratedb.run_sql(sql) + except Exception: + logger.exception("Writing CDC event to sink database failed") def should_update_sequence(self, sequence_number, sub_sequence_number): """ @@ -174,6 +203,20 @@ def shutdown_requested(self, shutdown_requested_input): shutdown_requested_input.checkpointer.checkpoint() -if __name__ == "__main__": - kcl_process = kcl.KCLProcess(RecordProcessor()) +def main(): + # Set up logging. + logfile = os.environ.get("CDC_LOGFILE", "cdc.log") + setup_logging(logfile) + + # Setup processor. + sqlalchemy_url = os.environ.get("CDC_SQLALCHEMY_URL") + table_name = os.environ.get("CDC_TABLE_NAME") + kcl_processor = RecordProcessor(sqlalchemy_url=sqlalchemy_url, table_name=table_name) + + # Invoke machinery. + kcl_process = kcl.KCLProcess(kcl_processor) kcl_process.run() + + +if __name__ == "__main__": + main() diff --git a/lorrystream/dynamodb_cloud/launch.sh b/lorrystream/dynamodb_cloud/launch.sh index c2b7108..05d7ca5 100644 --- a/lorrystream/dynamodb_cloud/launch.sh +++ b/lorrystream/dynamodb_cloud/launch.sh @@ -1 +1,15 @@ -python amazon_kclpy_helper.py --print_command --java /usr/bin/java --properties "$1" --log-configuration logback.xml +#!/bin/sh + +# Configure record processor. +export CDC_SQLALCHEMY_URL=crate:// +export CDC_TABLE_NAME=transactions +export CDC_LOGFILE=dynamodb_cdc_processor.log + +# Invoke KCL launcher. +KCLPY_PATH=$(python -c 'import amazon_kclpy; print(amazon_kclpy.__path__[0])') +/usr/bin/java \ + -DstreamName=dynamodb-cdc-nested \ + -cp "${KCLPY_PATH}/jars/*" \ + software.amazon.kinesis.multilang.MultiLangDaemon \ + --properties-file "$1" \ + --log-configuration logback.xml diff --git a/lorrystream/dynamodb_cloud/requirements.txt b/lorrystream/dynamodb_cloud/requirements.txt index 457065f..934b940 100644 --- a/lorrystream/dynamodb_cloud/requirements.txt +++ b/lorrystream/dynamodb_cloud/requirements.txt @@ -1,2 +1,4 @@ amazon-kclpy==2.1.5 awscli==1.33.* +boto3<1.35 +simplejson<4 diff --git a/lorrystream/transform/__init__.py b/lorrystream/transform/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/lorrystream/transform/dynamodb.py b/lorrystream/transform/dynamodb.py new file mode 100644 index 0000000..9f5caa8 --- /dev/null +++ b/lorrystream/transform/dynamodb.py @@ -0,0 +1,150 @@ +# ruff: noqa: S608 FIXME: Possible SQL injection vector through string-based query construction +import logging +import typing as t + +import simplejson as json +import toolz +from boto3.dynamodb.types import TypeDeserializer + +logger = logging.getLogger(__name__) + + +class DynamoCDCTranslatorBase: + """ + Translate DynamoDB CDC events into different representations. + """ + + def __init__(self): + self.deserializer = TypeDeserializer() + + def deserialize_item(self, item: t.Dict[str, t.Dict[str, str]]) -> t.Dict[str, str]: + """ + Deserialize DynamoDB type-enriched nested JSON snippet into vanilla Python. + + Example: + { + "humidity": {"N": "84.84"}, + "temperature": {"N": "42.42"}, + "device": {"S": "qux"}, + "timestamp": {"S": "2024-07-12T01:17:42"}, + } + + A complete list of DynamoDB data type descriptors: + + S – String + N – Number + B – Binary + BOOL – Boolean + NULL – Null + M – Map + L – List + SS – String Set + NS – Number Set + BS – Binary Set + + -- https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/HowItWorks.NamingRulesDataTypes.html#HowItWorks.DataTypeDescriptors + """ + return toolz.valmap(self.deserializer.deserialize, item) + + +class DynamoCDCTranslatorCrateDB(DynamoCDCTranslatorBase): + """ + Translate DynamoDB CDC events into CrateDB SQL statements that materialize them again. + + The SQL DDL schema for CrateDB: + CREATE TABLE (data OBJECT(DYNAMIC)); + + Blueprint: + https://www.singlestore.com/blog/cdc-data-from-dynamodb-to-singlestore-using-dynamodb-streams/ + """ + + # Define name of the column where CDC's record data will get materialized into. + DATA_COLUMN = "data" + + def __init__(self, table_name: str): + super().__init__() + self.table_name = self.quote_table_name(table_name) + + @property + def sql_ddl(self): + """ + Define SQL DDL statement for creating table in CrateDB that stores re-materialized CDC events. + """ + return f"CREATE TABLE {self.table_name} ({self.DATA_COLUMN} OBJECT(DYNAMIC));" + + def to_sql(self, record: t.Dict[str, t.Any]) -> str: + """ + Produce INSERT|UPDATE|DELETE SQL statement from INSERT|MODIFY|REMOVE CDC event record. + """ + event_source = record.get("eventSource") + event_name = record.get("eventName") + + if event_source != "aws:dynamodb": + raise ValueError(f"Unknown eventSource: {event_source}") + + if event_name == "INSERT": + values_clause = self.image_to_values(record["dynamodb"]["NewImage"]) + sql = f"INSERT INTO {self.table_name} " f"({self.DATA_COLUMN}) " f"VALUES ('{values_clause}');" + + elif event_name == "MODIFY": + values_clause = self.image_to_values(record["dynamodb"]["NewImage"]) + where_clause = self.keys_to_where(record["dynamodb"]["Keys"]) + sql = f"UPDATE {self.table_name} " f"SET {self.DATA_COLUMN} = '{values_clause}' " f"WHERE {where_clause};" + + elif event_name == "REMOVE": + where_clause = self.keys_to_where(record["dynamodb"]["Keys"]) + sql = f"DELETE FROM {self.table_name} " f"WHERE {where_clause};" + + else: + raise ValueError(f"Unknown CDC event name: {event_name}") + + return sql + + def image_to_values(self, image: t.Dict[str, t.Any]) -> str: + """ + Serialize CDC event's "(New|Old)Image" representation to a `VALUES` clause in CrateDB SQL syntax. + + IN (top-level stripped): + "NewImage": { + "humidity": {"N": "84.84"}, + "temperature": {"N": "42.42"}, + "device": {"S": "foo"}, + "timestamp": {"S": "2024-07-12T01:17:42"}, + } + + OUT: + {"humidity": 84.84, "temperature": 42.42, "device": "foo", "timestamp": "2024-07-12T01:17:42"} + """ + return json.dumps(self.deserialize_item(image)) + + def keys_to_where(self, keys: t.Dict[str, t.Dict[str, str]]) -> str: + """ + Serialize CDC event's "Keys" representation to an SQL `WHERE` clause in CrateDB SQL syntax. + + IN (top-level stripped): + "Keys": { + "device": {"S": "foo"}, + "timestamp": {"S": "2024-07-12T01:17:42"}, + } + + OUT: + WHERE data['device'] = 'foo' AND data['timestamp'] = '2024-07-12T01:17:42' + """ + constraints: t.List[str] = [] + for key_name, key_value_raw in keys.items(): + key_value = self.deserializer.deserialize(key_value_raw) + # FIXME: Does the quoting of the value on the right hand side need to take the data type into account? + constraint = f"{self.DATA_COLUMN}['{key_name}'] = '{key_value}'" + constraints.append(constraint) + return " AND ".join(constraints) + + @staticmethod + def quote_table_name(name: str): + """ + Poor man's table quoting. + + TODO: Better use or vendorize canonical table quoting function from CrateDB Toolkit, when applicable. + """ + if '"' not in name: + name = f'"{name}"' + return name diff --git a/pyproject.toml b/pyproject.toml index 181cfd7..71657d5 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -87,6 +87,7 @@ dynamic = [ ] dependencies = [ "boltons", + "boto3<1.35", "click<9", "colorama<1", "colorlog", @@ -97,6 +98,7 @@ dependencies = [ "paho-mqtt", "pandas<2.3", "pika<1.4", + "simplejson<4", "sqlalchemy==2.0.*", "sqlalchemy-cratedb==0.38.0", "streamz", diff --git a/lorrystream/dynamodb_cloud/test_decoder.py b/tests/transform/test_dynamodb.py similarity index 56% rename from lorrystream/dynamodb_cloud/test_decoder.py rename to tests/transform/test_dynamodb.py index a58329b..3be916d 100644 --- a/lorrystream/dynamodb_cloud/test_decoder.py +++ b/tests/transform/test_dynamodb.py @@ -1,12 +1,16 @@ -from lorrystream.dynamodb_cloud.decoder import OpsLogDecoder +import decimal -MSG_INSERT = { +from lorrystream.transform.dynamodb import DynamoCDCTranslatorCrateDB + +READING_BASIC = {"device": "foo", "temperature": 42.42, "humidity": 84.84} + +MSG_INSERT_BASIC = { "awsRegion": "us-east-1", "eventID": "b015b5f0-c095-4b50-8ad0-4279aa3d88c6", "eventName": "INSERT", "userIdentity": None, "recordFormat": "application/json", - "tableName": "table-testdrive", + "tableName": "foo", "dynamodb": { "ApproximateCreationDateTime": 1720740233012995, "Keys": {"device": {"S": "foo"}, "timestamp": {"S": "2024-07-12T01:17:42"}}, @@ -21,13 +25,33 @@ }, "eventSource": "aws:dynamodb", } +MSG_INSERT_NESTED = { + "awsRegion": "us-east-1", + "eventID": "b581c2dc-9d97-44ed-94f7-cb77e4fdb740", + "eventName": "INSERT", + "userIdentity": None, + "recordFormat": "application/json", + "tableName": "table-testdrive-nested", + "dynamodb": { + "ApproximateCreationDateTime": 1720800199717446, + "Keys": {"id": {"S": "5F9E-Fsadd41C-4C92-A8C1-70BF3FFB9266"}}, + "NewImage": { + "id": {"S": "5F9E-Fsadd41C-4C92-A8C1-70BF3FFB9266"}, + "data": {"M": {"temperature": {"N": "42.42"}, "humidity": {"N": "84.84"}}}, + "meta": {"M": {"timestamp": {"S": "2024-07-12T01:17:42"}, "device": {"S": "foo"}}}, + }, + "SizeBytes": 156, + "ApproximateCreationDateTimePrecision": "MICROSECOND", + }, + "eventSource": "aws:dynamodb", +} MSG_MODIFY = { "awsRegion": "us-east-1", "eventID": "24757579-ebfd-480a-956d-a1287d2ef707", "eventName": "MODIFY", "userIdentity": None, "recordFormat": "application/json", - "tableName": "table-testdrive", + "tableName": "foo", "dynamodb": { "ApproximateCreationDateTime": 1720742302233719, "Keys": {"device": {"S": "foo"}, "timestamp": {"S": "2024-07-12T01:17:42"}}, @@ -54,7 +78,7 @@ "eventName": "REMOVE", "userIdentity": None, "recordFormat": "application/json", - "tableName": "table-testdrive", + "tableName": "foo", "dynamodb": { "ApproximateCreationDateTime": 1720742321848352, "Keys": {"device": {"S": "bar"}, "timestamp": {"S": "2024-07-12T01:17:42"}}, @@ -71,24 +95,39 @@ } -def test_decode_insert(): +def test_decode_ddb_deserialize_type(): + assert DynamoCDCTranslatorCrateDB(table_name="foo").deserialize_item({"foo": {"N": "84.84"}}) == { + "foo": decimal.Decimal("84.84") + } + + +def test_decode_cdc_insert_basic(): assert ( - OpsLogDecoder.decode_opslog_item(MSG_INSERT) == "INSERT INTO transactions (data) " + DynamoCDCTranslatorCrateDB(table_name="foo").to_sql(MSG_INSERT_BASIC) == 'INSERT INTO "foo" (data) ' 'VALUES (\'{"humidity": 84.84, "temperature": 42.42, "device": "foo", "timestamp": "2024-07-12T01:17:42"}\');' ) -def test_decode_modify(): +def test_decode_cdc_insert_nested(): + assert ( + DynamoCDCTranslatorCrateDB(table_name="foo").to_sql(MSG_INSERT_NESTED) + == 'INSERT INTO "foo" (data) VALUES (\'{"id": "5F9E-Fsadd41C-4C92-A8C1-70BF3FFB9266", ' + '"data": {"temperature": 42.42, "humidity": 84.84}, ' + '"meta": {"timestamp": "2024-07-12T01:17:42", "device": "foo"}}\');' + ) + + +def test_decode_cdc_modify(): assert ( - OpsLogDecoder.decode_opslog_item(MSG_MODIFY) == "UPDATE transactions\n " + DynamoCDCTranslatorCrateDB(table_name="foo").to_sql(MSG_MODIFY) == 'UPDATE "foo" ' 'SET data = \'{"humidity": 84.84, "temperature": 55.66, ' - '"device": "bar", "timestamp": "2024-07-12T01:17:42"}\'\n ' + '"device": "bar", "timestamp": "2024-07-12T01:17:42"}\' ' "WHERE data['device'] = 'foo' AND data['timestamp'] = '2024-07-12T01:17:42';" ) -def test_decode_remove(): +def test_decode_cdc_remove(): assert ( - OpsLogDecoder.decode_opslog_item(MSG_REMOVE) == "DELETE FROM transactions\n " + DynamoCDCTranslatorCrateDB(table_name="foo").to_sql(MSG_REMOVE) == 'DELETE FROM "foo" ' "WHERE data['device'] = 'bar' AND data['timestamp'] = '2024-07-12T01:17:42';" )