From b304e7a36e7293fc12c598cde83e5ad31bf4ee52 Mon Sep 17 00:00:00 2001 From: Andreas Motl Date: Tue, 16 Jul 2024 20:33:19 +0200 Subject: [PATCH] Transform: MongoDB CDC event to CrateDB SQL --- .github/workflows/tests.yml | 64 +++++++- CHANGES.md | 1 + README.md | 6 + docs/mongodb.md | 101 ++++++++++++ examples/mongodb_cdc_cratedb.py | 106 +++++++++++++ pyproject.toml | 12 +- src/commons_codec/transform/mongodb.py | 203 +++++++++++++++++++++++++ tests/transform/test_mongodb.py | 157 +++++++++++++++++++ 8 files changed, 644 insertions(+), 6 deletions(-) create mode 100644 docs/mongodb.md create mode 100644 examples/mongodb_cdc_cratedb.py create mode 100644 src/commons_codec/transform/mongodb.py create mode 100644 tests/transform/test_mongodb.py diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 74b55c7..cb8e93b 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -1,4 +1,4 @@ -name: "Tests: PR" +name: "Tests" on: push: @@ -11,9 +11,10 @@ concurrency: cancel-in-progress: true jobs: - test: + + test-vanilla: name: " - Python: ${{ matrix.python-version }} + Vanilla: Python ${{ matrix.python-version }} " runs-on: ${{ matrix.os }} strategy: @@ -60,7 +61,62 @@ jobs: CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }} with: files: ./coverage.xml - flags: main + flags: vanilla + env_vars: OS,PYTHON + name: codecov-umbrella + fail_ci_if_error: true + + + test-mongodb: + name: " + MongoDB: Python ${{ matrix.python-version }} + " + runs-on: ${{ matrix.os }} + strategy: + fail-fast: false + matrix: + os: ['ubuntu-latest'] + python-version: ['3.7', '3.8', '3.9', '3.10', '3.11', '3.12'] + + env: + OS: ${{ matrix.os }} + PYTHON: ${{ matrix.python-version }} + + steps: + + - name: Acquire sources + uses: actions/checkout@v4 + + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@v5 + with: + python-version: ${{ matrix.python-version }} + architecture: x64 + cache: 'pip' + cache-dependency-path: + pyproject.toml + + - name: Set up project + run: | + + # `setuptools 0.64.0` adds support for editable install hooks (PEP 660). + # https://github.com/pypa/setuptools/blob/main/CHANGES.rst#v6400 + pip install "setuptools>=64" --upgrade + + # Install package in editable mode. + pip install --use-pep517 --prefer-binary --editable=.[develop,test,mongodb] + + - name: Run linters and software tests + run: poe check + + # https://github.com/codecov/codecov-action + - name: Upload coverage results to Codecov + uses: codecov/codecov-action@v4 + env: + CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }} + with: + files: ./coverage.xml + flags: mongodb env_vars: OS,PYTHON name: codecov-umbrella fail_ci_if_error: true diff --git a/CHANGES.md b/CHANGES.md index 9fac4c7..9917001 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,6 +1,7 @@ # Changelog ## Unreleased +- Added transformer for MongoDB CDC to CrateDB SQL conversion ## 2024/07/16 v0.0.1 - Added decoders for Airrohr, Tasmota, and TTS/TTN from Kotori DAQ diff --git a/README.md b/README.md index ee86aa1..eb22042 100644 --- a/README.md +++ b/README.md @@ -32,6 +32,10 @@ To install the most recent version, run: pip install --upgrade commons-codec ``` +## Usage +In order to learn how to use the library, please visit the [documentation], +and explore the source code or its [examples]. + ## License The project uses the LGPLv3 license for the whole ensemble. However, individual portions of the code base are vendorized from other Python packages, where @@ -50,6 +54,8 @@ both libraries' ingredients don't have anything in common, yet. [Apache Commons Codec]: https://commons.apache.org/proper/commons-codec/ [commons-codec]: https://pypi.org/project/commons-codec/ +[documentation]: https://github.com/daq-tools/commons-codec/tree/main/docs +[examples]: https://github.com/daq-tools/commons-codec/tree/main/examples [Kotori]: https://github.com/daq-tools/kotori [LorryStream]: https://github.com/daq-tools/lorrystream/ [PyPI]: https://pypi.org/ diff --git a/docs/mongodb.md b/docs/mongodb.md new file mode 100644 index 0000000..30f731a --- /dev/null +++ b/docs/mongodb.md @@ -0,0 +1,101 @@ +# Relay MongoDB Change Stream into CrateDB + +## About +[mongodb_cdc_cratedb.py] demonstrates a basic example program to relay event +records from [MongoDB Change Streams] into [CrateDB]. + +> Change streams allow applications to access real-time data changes without the prior +> complexity and risk of manually tailing the oplog. Applications can use change streams +> to subscribe to all data changes on a single collection, a database, or an entire +> deployment, and immediately react to them. +> +> - https://www.mongodb.com/docs/manual/changeStreams/ +> - https://www.mongodb.com/developer/languages/python/python-change-streams/ + + +## Services + +### CrateDB +Start CrateDB. +```shell +docker run --rm -it --name=cratedb --publish=4200:4200 --env=CRATE_HEAP_SIZE=2g \ + crate:5.7 -Cdiscovery.type=single-node +``` + +### MongoDB +Start MongoDB. +Please note that change streams are only available for replica sets and +sharded clusters, so let's define a replica set by using the +`--replSet rs-testdrive` option when starting the MongoDB server. +```shell +docker run -it --rm --name=mongodb --publish=27017:27017 \ + mongo:7 mongod --replSet rs-testdrive +``` + +Now, initialize the replica set, by using the `mongosh` command to invoke +the `rs.initiate()` operation. +```shell +export MONGODB_URL="mongodb://localhost/" +docker run -i --rm --network=host mongo:7 mongosh ${MONGODB_URL} < t.Dict[str, str]: + """ + Deserialize MongoDB type-enriched nested JSON snippet into vanilla Python. + + Example: + { + "_id": ObjectId("669683c2b0750b2c84893f3e"), + "id": "5F9E", + "data": {"temperature": 42.42, "humidity": 84.84}, + "meta": {"timestamp": datetime.datetime(2024, 7, 11, 23, 17, 42), "device": "foo"}, + } + """ + return _json_convert(item) + + +class MongoDBCDCTranslatorCrateDB(MongoDBCDCTranslatorBase): + """ + Translate MongoDB CDC events into CrateDB SQL statements that materialize them again. + + Please note that change streams are only available for replica sets and sharded clusters. + + Accepted events: insert, update, replace, delete + Ignored events: drop, invalidate + + The current implementation uses the `fullDocument` representation to update records + in the sink database table. In order to receive them on `update` events as well, you + need to subscribe to change events using `watch(full_document="updateLookup")`. + + The MongoDB documentation has a few remarks about the caveats of this approach: + + > Updates with the `fullDocument` Option: The `fullDocument` option for Update Operations + > does not guarantee the returned document does not include further changes. In contrast + > to the document deltas that are guaranteed to be sent in order with update notifications, + > there is no guarantee that the `fullDocument` returned represents the document as it was + > exactly after the operation. + > + > `updateLookup` will poll the current version of the document. If changes happen quickly + > it is possible that the document was changed before the updateLookup finished. This means + > that the `fullDocument` might not represent the document at the time of the event thus + > potentially giving the impression events took place in a different order. + > + > -- https://www.mongodb.com/developer/languages/python/python-change-streams/ + + The SQL DDL schema for CrateDB: + CREATE TABLE (oid TEXT, data OBJECT(DYNAMIC)); + """ + + # Define name of the column where MongoDB's OID for a document will be stored. + ID_COLUMN = "oid" + + # Define name of the column where CDC's record data will get materialized into. + DATA_COLUMN = "data" + + def __init__(self, table_name: str): + super().__init__() + self.table_name = self.quote_table_name(table_name) + + @property + def sql_ddl(self): + """ + Define SQL DDL statement for creating table in CrateDB that stores re-materialized CDC events. + """ + return ( + f"CREATE TABLE IF NOT EXISTS {self.table_name} ({self.ID_COLUMN} TEXT, {self.DATA_COLUMN} OBJECT(DYNAMIC));" + ) + + def to_sql(self, record: t.Dict[str, t.Any]) -> str: + """ + Produce INSERT|UPDATE|DELETE SQL statement from insert|update|replace|delete CDC event record. + """ + + if "operationType" in record and record["operationType"]: + operation_type: str = str(record["operationType"]) + else: + raise ValueError(f"Operation Type missing or empty: {record}") + + if operation_type == "insert": + oid: str = self.get_document_key(record) + full_document = self.get_full_document(record) + values_clause = self.full_document_to_values(full_document) + sql = ( + f"INSERT INTO {self.table_name} " + f"({self.ID_COLUMN}, {self.DATA_COLUMN}) " + f"VALUES ('{oid}', '{values_clause}');" + ) + + # In order to use "full document" representations from "update" events, + # you need to use `watch(full_document="updateLookup")`. + # https://www.mongodb.com/docs/manual/changeStreams/#lookup-full-document-for-update-operations + elif operation_type in ["update", "replace"]: + full_document = self.get_full_document(record) + values_clause = self.full_document_to_values(full_document) + where_clause = self.where_clause(record) + sql = f"UPDATE {self.table_name} SET {self.DATA_COLUMN} = '{values_clause}' WHERE {where_clause};" + + elif operation_type == "delete": + where_clause = self.where_clause(record) + sql = f"DELETE FROM {self.table_name} WHERE {where_clause};" + + # TODO: Enable applying the "drop" operation conditionally when enabled. + elif operation_type == "drop": + logger.info("Received 'drop' operation, but skipping to apply 'DROP TABLE'") + sql = "" + + elif operation_type == "invalidate": + logger.info("Ignoring 'invalidate' CDC operation") + sql = "" + + else: + raise ValueError(f"Unknown CDC operation type: {operation_type}") + + return sql + + @staticmethod + def get_document_key(record: t.Dict[str, t.Any]) -> str: + """ + Return value of document key (MongoDB document OID) from CDC record. + + "documentKey": {"_id": ObjectId("669683c2b0750b2c84893f3e")} + """ + return str(record.get("documentKey", {}).get("_id")) + + @staticmethod + def get_full_document(record: t.Dict[str, t.Any]) -> t.Dict[str, t.Any]: + """ + return `fullDocument` representation from record. + """ + return t.cast(dict, record.get("fullDocument")) + + def full_document_to_values(self, document: t.Dict[str, t.Any]) -> str: + """ + Serialize CDC event's "fullDocument" representation to a `VALUES` clause in CrateDB SQL syntax. + + IN (top-level stripped): + "fullDocument": { + "_id": ObjectId("669683c2b0750b2c84893f3e"), + "id": "5F9E", + "data": {"temperature": 42.42, "humidity": 84.84}, + "meta": {"timestamp": datetime.datetime(2024, 7, 11, 23, 17, 42), "device": "foo"}, + } + + OUT: + {"_id": {"$oid": "669683c2b0750b2c84893f3e"}, + "id": "5F9E", + "data": {"temperature": 42.42, "humidity": 84.84}, + "meta": {"timestamp": {"$date": "2024-07-11T23:17:42Z"}, "device": "foo"}, + } + """ + return json.dumps(self.deserialize_item(document)) + + def where_clause(self, record: t.Dict[str, t.Any]) -> str: + """ + When converging an oplog of a MongoDB collection, the primary key is always the MongoDB document OID. + + IN (top-level stripped): + "documentKey": {"_id": ObjectId("669683c2b0750b2c84893f3e")} + + OUT: + WHERE oid = '669683c2b0750b2c84893f3e' + """ + oid = self.get_document_key(record) + return f"oid = '{oid}'" + + @staticmethod + def quote_table_name(name: str): + """ + Poor man's table quoting. + + TODO: Better use or vendorize canonical table quoting function from CrateDB Toolkit, when applicable. + """ + if '"' not in name: + name = f'"{name}"' + return name diff --git a/tests/transform/test_mongodb.py b/tests/transform/test_mongodb.py new file mode 100644 index 0000000..4ca48ac --- /dev/null +++ b/tests/transform/test_mongodb.py @@ -0,0 +1,157 @@ +# ruff: noqa: E402, E501 +import datetime + +import pytest + +pytest.importorskip("pymongo") + +from bson import ObjectId, Timestamp +from commons_codec.transform.mongodb import MongoDBCDCTranslatorCrateDB + +MSG_OPERATION_UNKNOWN = { + "operationType": "foobar", +} +MSG_OPERATION_MISSING = {} +MSG_OPERATION_EMPTY = { + "operationType": "", +} + +MSG_INSERT = { + "_id": { + "_data": "82669683C2000000022B042C0100296E5A1004413F85D5B4CF4680AA4D17641E9DF22D463C6F7065726174696F6E54797065003C696E736572740046646F63756D656E744B65790046645F69640064669683C2B0750B2C84893F3E000004" + }, + "operationType": "insert", + "clusterTime": Timestamp(1721140162, 2), + "wallTime": datetime.datetime(2024, 7, 16, 14, 29, 22, 907000), + "fullDocument": { + "_id": ObjectId("669683c2b0750b2c84893f3e"), + "id": "5F9E", + "data": {"temperature": 42.42, "humidity": 84.84}, + "meta": {"timestamp": datetime.datetime(2024, 7, 11, 23, 17, 42), "device": "foo"}, + }, + "ns": {"db": "testdrive", "coll": "data"}, + "documentKey": {"_id": ObjectId("669683c2b0750b2c84893f3e")}, +} +MSG_UPDATE = { + "_id": { + "_data": "82669683C2000000032B042C0100296E5A1004413F85D5B4CF4680AA4D17641E9DF22D463C6F7065726174696F6E54797065003C7570646174650046646F63756D656E744B65790046645F69640064669683C2B0750B2C84893F3E000004" + }, + "operationType": "update", + "clusterTime": Timestamp(1721140162, 3), + "wallTime": datetime.datetime(2024, 7, 16, 14, 29, 22, 910000), + "fullDocument": { + "_id": ObjectId("669683c2b0750b2c84893f3e"), + "id": "5F9E", + "data": {"temperature": 42.5}, + "meta": {"timestamp": datetime.datetime(2024, 7, 11, 23, 17, 42), "device": "foo"}, + }, + "ns": {"db": "testdrive", "coll": "data"}, + "documentKey": {"_id": ObjectId("669683c2b0750b2c84893f3e")}, + "updateDescription": {"updatedFields": {"data": {"temperature": 42.5}}, "removedFields": [], "truncatedArrays": []}, +} +MSG_REPLACE = { + "_id": { + "_data": "82669683C2000000042B042C0100296E5A1004413F85D5B4CF4680AA4D17641E9DF22D463C6F7065726174696F6E54797065003C7265706C6163650046646F63756D656E744B65790046645F69640064669683C2B0750B2C84893F3E000004" + }, + "operationType": "replace", + "clusterTime": Timestamp(1721140162, 4), + "wallTime": datetime.datetime(2024, 7, 16, 14, 29, 22, 911000), + "fullDocument": {"_id": ObjectId("669683c2b0750b2c84893f3e"), "tags": ["deleted"]}, + "ns": {"db": "testdrive", "coll": "data"}, + "documentKey": {"_id": ObjectId("669683c2b0750b2c84893f3e")}, +} +MSG_DELETE = { + "_id": { + "_data": "82669693C5000000032B042C0100296E5A10043D9AA2FA889C45049D2CDE4175242B7E463C6F7065726174696F6E54797065003C64656C6574650046646F63756D656E744B65790046645F69640064669693C5002EF91EA9C7A562000004" + }, + "operationType": "delete", + "clusterTime": Timestamp(1721144261, 3), + "wallTime": datetime.datetime(2024, 7, 16, 15, 37, 41, 831000), + "ns": {"db": "testdrive", "coll": "data"}, + "documentKey": {"_id": ObjectId("669693c5002ef91ea9c7a562")}, +} +MSG_DROP = { + "_id": { + "_data": "82669683C2000000052B042C0100296E5A1004413F85D5B4CF4680AA4D17641E9DF22D463C6F7065726174696F6E54797065003C64726F70000004" + }, + "operationType": "drop", + "clusterTime": Timestamp(1721140162, 5), + "wallTime": datetime.datetime(2024, 7, 16, 14, 29, 22, 914000), + "ns": {"db": "testdrive", "coll": "data"}, +} + +MSG_INVALIDATE = { + "_id": { + "_data": "82669683C2000000052B042C0100296F5A1004413F85D5B4CF4680AA4D17641E9DF22D463C6F7065726174696F6E54797065003C64726F70000004" + }, + "operationType": "invalidate", + "clusterTime": Timestamp(1721140162, 5), + "wallTime": datetime.datetime(2024, 7, 16, 14, 29, 22, 914000), +} + + +def test_decode_cdc_sql_ddl(): + assert ( + MongoDBCDCTranslatorCrateDB(table_name="foo").sql_ddl + == 'CREATE TABLE IF NOT EXISTS "foo" (oid TEXT, data OBJECT(DYNAMIC));' + ) + + +def test_decode_cdc_unknown_event(): + with pytest.raises(ValueError) as ex: + MongoDBCDCTranslatorCrateDB(table_name="foo").to_sql(MSG_OPERATION_UNKNOWN) + assert ex.match("Unknown CDC operation type: foobar") + + +def test_decode_cdc_optype_missing(): + with pytest.raises(ValueError) as ex: + MongoDBCDCTranslatorCrateDB(table_name="foo").to_sql(MSG_OPERATION_MISSING) + assert ex.match("Operation Type missing or empty: {}") + + +def test_decode_cdc_optype_empty(): + with pytest.raises(ValueError) as ex: + MongoDBCDCTranslatorCrateDB(table_name="foo").to_sql(MSG_OPERATION_EMPTY) + assert ex.match("Operation Type missing or empty: {'operationType': ''}") + + +def test_decode_cdc_insert(): + assert ( + MongoDBCDCTranslatorCrateDB(table_name="foo").to_sql(MSG_INSERT) == 'INSERT INTO "foo" (oid, data) ' + 'VALUES (\'669683c2b0750b2c84893f3e\', \'{"_id": {"$oid": "669683c2b0750b2c84893f3e"}, "id": "5F9E", ' + '"data": {"temperature": 42.42, "humidity": 84.84}, ' + '"meta": {"timestamp": {"$date": "2024-07-11T23:17:42Z"}, "device": "foo"}}\');' + ) + + +def test_decode_cdc_update(): + assert ( + MongoDBCDCTranslatorCrateDB(table_name="foo").to_sql(MSG_UPDATE) + == """UPDATE "foo" SET data = '{"_id": {"$oid": "669683c2b0750b2c84893f3e"}, "id": "5F9E", """ + """"data": {"temperature": 42.5}, "meta": {"timestamp": {"$date": "2024-07-11T23:17:42Z"}, "device": "foo"}}' """ + "WHERE oid = '669683c2b0750b2c84893f3e';" + ) + + +def test_decode_cdc_replace(): + assert ( + MongoDBCDCTranslatorCrateDB(table_name="foo").to_sql(MSG_REPLACE) + == """UPDATE "foo" SET data = '{"_id": {"$oid": "669683c2b0750b2c84893f3e"}, """ + """"tags": ["deleted"]}' """ + "WHERE oid = '669683c2b0750b2c84893f3e';" + ) + + +def test_decode_cdc_delete(): + assert ( + MongoDBCDCTranslatorCrateDB(table_name="foo").to_sql(MSG_DELETE) + == """DELETE FROM "foo" WHERE oid = '669693c5002ef91ea9c7a562';""" + ) + + +def test_decode_cdc_drop(): + assert MongoDBCDCTranslatorCrateDB(table_name="foo").to_sql(MSG_DROP) == "" + + +def test_decode_cdc_invalidate(): + assert MongoDBCDCTranslatorCrateDB(table_name="foo").to_sql(MSG_INVALIDATE) == ""