From f9a59ba811ea8fc8611b26c3a9315ccb19407e9d Mon Sep 17 00:00:00 2001 From: Mark Johnston Date: Thu, 24 Oct 2024 13:09:06 +1300 Subject: [PATCH] Handle timestamp incremental replication (#89) * Handle timestamp (nothing to do with datetime -- SQL Server rowversion) incremental replication * Release 2.6.4 prep --- .bumpversion.cfg | 2 +- CHANGELOG.md | 4 + pyproject.toml | 2 +- tap_mssql/__init__.py | 6 +- tap_mssql/sync_strategies/incremental.py | 14 +++- tests/test_tap_mssql.py | 94 ++++++++++++++++++++++-- 6 files changed, 108 insertions(+), 14 deletions(-) diff --git a/.bumpversion.cfg b/.bumpversion.cfg index b4a0fc3c..4442f027 100644 --- a/.bumpversion.cfg +++ b/.bumpversion.cfg @@ -1,5 +1,5 @@ [bumpversion] -current_version = 2.6.3 +current_version = 2.6.4 parse = (?P\d+) \.(?P\d+) \.(?P\d+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 8b92ffe5..7827836a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,7 @@ +# tap-mssql 2.6.4 2024-10-24 +* Update to handle `timestamp` (not a datetime value, a [deprecated](https://learn.microsoft.com/en-us/sql/t-sql/data-types/rowversion-transact-sql?view=sql-server-ver16#remarks) synonym of internal `rowversion`) as string +* Add tests for incremental syncing using a `timestamp` column as `replication-key` + # tap-mssql 2.6.3 2024-10-17 * Updating CDC documentation with a packaged method to maintain CDC tables. diff --git a/pyproject.toml b/pyproject.toml index ac3f44be..bda38b2e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "tap-mssql" -version = "2.6.3" +version = "2.6.4" description = "A pipelinewise compatible tap for connecting Microsoft SQL Server" authors = ["Rob Winters "] license = "GNU Affero" diff --git a/tap_mssql/__init__.py b/tap_mssql/__init__.py index dee28184..d813870f 100644 --- a/tap_mssql/__init__.py +++ b/tap_mssql/__init__.py @@ -66,7 +66,7 @@ DECIMAL_TYPES = set(["decimal", "number", "money", "smallmoney", "numeric"]) -DATETIME_TYPES = set(["datetime2", "datetime", "datetimeoffset", "timestamp", "smalldatetime"]) +DATETIME_TYPES = set(["datetime2", "datetime", "datetimeoffset", "smalldatetime"]) DATE_TYPES = set(["date"]) @@ -101,6 +101,10 @@ def schema_for_column(c, config): if data_type == "bit": result.type = ["null", "boolean"] + + elif data_type in ["timestamp", "rowversion"]: + result.type = ["null", "string"] + result.format = "rowversion" elif data_type in BYTES_FOR_INTEGER_TYPE: result.type = ["null", "integer"] diff --git a/tap_mssql/sync_strategies/incremental.py b/tap_mssql/sync_strategies/incremental.py index e1ed41d9..8cedf4b7 100755 --- a/tap_mssql/sync_strategies/incremental.py +++ b/tap_mssql/sync_strategies/incremental.py @@ -57,11 +57,19 @@ def sync_table(mssql_conn, config, catalog_entry, state, columns): replication_key_value = datetime.fromtimestamp( pendulum.parse(replication_key_value).timestamp() ) + # Handle timestamp incremental (timestamp) + if catalog_entry.schema.properties[replication_key_metadata].format == 'rowversion': + select_sql += """ WHERE CAST("{}" AS BIGINT) >= + convert(bigint, convert (varbinary(8), '0x{}', 1)) + ORDER BY "{}" ASC""".format( + replication_key_metadata, replication_key_value, replication_key_metadata + ) + else: + select_sql += ' WHERE "{}" >= %(replication_key_value)s ORDER BY "{}" ASC'.format( + replication_key_metadata, replication_key_metadata + ) - select_sql += ' WHERE "{}" >= %(replication_key_value)s ORDER BY "{}" ASC'.format( - replication_key_metadata, replication_key_metadata - ) params["replication_key_value"] = replication_key_value elif replication_key_metadata is not None: diff --git a/tests/test_tap_mssql.py b/tests/test_tap_mssql.py index 59c8aeae..397310b1 100755 --- a/tests/test_tap_mssql.py +++ b/tests/test_tap_mssql.py @@ -561,15 +561,15 @@ def test_with_no_state(self): (message_types, versions) = message_types_and_versions(SINGER_MESSAGES) - self.assertEqual( - [ - "ActivateVersionMessage", - "RecordMessage", - ], - sorted(list(set(message_types))), - ) + self.assertTrue(isinstance(versions[0], int)) self.assertEqual(versions[0], versions[1]) + record_messages = [message for message in SINGER_MESSAGES if isinstance(message,singer.RecordMessage)] + incremental_record_messages = [m for m in record_messages if m.stream == 'dbo-incremental'] + integer_incremental_record_messages = [m for m in record_messages if m.stream == 'dbo-integer_incremental'] + + self.assertEqual(len(incremental_record_messages),3) + self.assertEqual(len(integer_incremental_record_messages),3) def test_with_state(self): state = { @@ -602,7 +602,14 @@ def test_with_state(self): ) self.assertTrue(isinstance(versions[0], int)) self.assertEqual(versions[0], versions[1]) - self.assertEqual(versions[1], 12345) + + # Based on state values provided check the number of record messages emitted + record_messages = [message for message in SINGER_MESSAGES if isinstance(message,singer.RecordMessage)] + incremental_record_messages = [m for m in record_messages if m.stream == 'dbo-incremental'] + integer_incremental_record_messages = [m for m in record_messages if m.stream == 'dbo-integer_incremental'] + + self.assertEqual(len(incremental_record_messages),2) + self.assertEqual(len(integer_incremental_record_messages),1) class TestViews(unittest.TestCase): @@ -650,6 +657,76 @@ def test_do_not_discover_key_properties_for_view(self): self.assertEqual(primary_keys, {"a_table": ["id"], "a_view": []}) +class TestTimestampIncrementalReplication(unittest.TestCase): + def setUp(self): + self.conn = test_utils.get_test_connection() + + with connect_with_backoff(self.conn) as open_conn: + with open_conn.cursor() as cursor: + try: + cursor.execute("drop table incremental") + except: + pass + cursor.execute("CREATE TABLE incremental (val int, updated timestamp)") + cursor.execute("INSERT INTO incremental (val) VALUES (1)") #00000000000007d1 + cursor.execute("INSERT INTO incremental (val) VALUES (2)") #00000000000007d2 + cursor.execute("INSERT INTO incremental (val) VALUES (3)") #00000000000007d3 + + self.catalog = test_utils.discover_catalog(self.conn, {}) + + for stream in self.catalog.streams: + stream.metadata = [ + { + "breadcrumb": (), + "metadata": { + "selected": True, + "table-key-properties": [], + "database-name": "dbo", + }, + }, + {"breadcrumb": ("properties", "val"), "metadata": {"selected": True}}, + ] + + stream.stream = stream.table + test_utils.set_replication_method_and_key(stream, "INCREMENTAL", "updated") + + def test_with_no_state(self): + state = {} + + global SINGER_MESSAGES + SINGER_MESSAGES.clear() + + tap_mssql.do_sync(self.conn, test_utils.get_db_config(), self.catalog, state) + + (message_types, versions) = message_types_and_versions(SINGER_MESSAGES) + + record_messages = [message for message in SINGER_MESSAGES if isinstance(message,singer.RecordMessage)] + + self.assertEqual(len(record_messages),3) + + + def test_with_state(self): + state = { + "bookmarks": { + "dbo-incremental": { + "version": 1, + "replication_key_value": '00000000000007d2', + "replication_key": "updated", + }, + } + } + + global SINGER_MESSAGES + SINGER_MESSAGES.clear() + tap_mssql.do_sync(self.conn, test_utils.get_db_config(), self.catalog, state) + + (message_types, versions) = message_types_and_versions(SINGER_MESSAGES) + + # Given the state value supplied, there should only be two RECORD messages + record_messages = [message for message in SINGER_MESSAGES if isinstance(message,singer.RecordMessage)] + + self.assertEqual(len(record_messages),2) + class TestPrimaryKeyUniqueKey(unittest.TestCase): def setUp(self): self.conn = test_utils.get_test_connection() @@ -708,6 +785,7 @@ def test_only_primary_key(self): self.assertEqual(primary_keys["pk_only_table"], ["pk"]) self.assertEqual(primary_keys["pk_uc_table"], ["pk"]) + if __name__ == "__main__": # test1 = TestBinlogReplication() # test1.setUp()