|
25 | 25 | # requires-python = ">=3.9"
|
26 | 26 | # dependencies = [
|
27 | 27 | # "commons-codec",
|
28 |
| -# "sqlalchemy-cratedb==0.38.0", |
| 28 | +# "sqlalchemy-cratedb>=0.38.0", |
29 | 29 | # ]
|
30 | 30 | # ///
|
31 | 31 | import base64
|
|
38 | 38 | from commons_codec.exception import UnknownOperationError
|
39 | 39 | from commons_codec.model import ColumnTypeMapStore
|
40 | 40 | from commons_codec.transform.aws_dms import DMSTranslatorCrateDB
|
41 |
| -from commons_codec.transform.dynamodb import DynamoCDCTranslatorCrateDB |
| 41 | +from commons_codec.transform.dynamodb import DynamoDBCDCTranslator |
42 | 42 | from sqlalchemy.util import asbool
|
43 | 43 |
|
44 | 44 | LOG_LEVEL: str = os.environ.get("LOG_LEVEL", "INFO")
|
|
79 | 79 | if MESSAGE_FORMAT == "dms":
|
80 | 80 | cdc = DMSTranslatorCrateDB(column_types=column_types)
|
81 | 81 | elif MESSAGE_FORMAT == "dynamodb":
|
82 |
| - cdc = DynamoCDCTranslatorCrateDB(table_name=SINK_TABLE) |
| 82 | + cdc = DynamoDBCDCTranslator(table_name=SINK_TABLE) |
83 | 83 |
|
84 | 84 | # Create the database connection outside the handler to allow
|
85 | 85 | # connections to be re-used by subsequent function invocations.
|
@@ -121,8 +121,8 @@ def handler(event, context):
|
121 | 121 | logger.debug(f"Record Data: {record_data}")
|
122 | 122 |
|
123 | 123 | # Process record.
|
124 |
| - sql = cdc.to_sql(record_data) |
125 |
| - connection.execute(sa.text(sql)) |
| 124 | + operation = cdc.to_sql(record_data) |
| 125 | + connection.execute(sa.text(operation.statement), operation.parameters) |
126 | 126 | connection.commit()
|
127 | 127 |
|
128 | 128 | # Bookkeeping.
|
|
0 commit comments